In [None]:
import random
import copy
from tqdm import tqdm

import numpy as np

from scipy.stats import entropy

from sim_utils import (
    encode_age,
    encode_ethnicity,
    encode_geo,
    encode_race,
    encode_sex,
    load_and_check_data,
    load_csv,
)

# none value - when an inquiry for a symptom is not yet done
NONE_VAL = 0
# presence value - indicating a symptom is present
PRES_VAL = 1
# absence value - indicating a symptom is absent
ABS_VAL = -1    


class environment(object):
    """Class for simulating patients interaction for an RL project.

    The simulator is based on a csv file exported from a Synthea simulation.

    """

    def __init__(
        self,
        args,
        patient_filepath,
        train=None,
    ):
        """Init method of the simulator.

        Parameters
        ----------
        args: namespace
            arguments of the main program.
        patient_filepath: str
            path to the csv file containing generated patients
            from Synthea simulator.
        train: boolean
            if the env is used in train mode or not.
        """
        self.args = args
        self.filepath = patient_filepath
        self.use_initial_symptom_flag = not args.no_initial_evidence
        self.max_turns = args.MAXSTEP
        self.action_type = 0
        self.include_turns_in_state = args.include_turns_in_state
        self.include_race_in_state = False
        self.include_ethnicity_in_state = False
        self.use_differential_diagnosis = not args.no_differential
        self.train = args.train if train is None else train
        if not self.max_turns and self.include_turns_in_state:
            raise ValueError(
                "'max_turns' could not be None/0 if 'include_turns_in_state' is True."
            )

        # load patient data
        [
            rb,
            self.unique_symptoms,
            self.unique_pathos,
            patho_symptoms,
            self.unique_races,
            self.unique_ethnics,
            self.unique_genders,
            self.symptoms_with_multiple_answers,
            self.max_differential_len,
            self.unique_differential_pathos,
            self.unique_init_symptoms,
        ] = load_csv(self.filepath)

        if len(rb) == 0:
            raise ValueError("The provided file contains no valid patients.")

        # convert the patho_symptoms into string format
        patho_symptoms = {
            self.unique_pathos[a]: set(
                [self.unique_symptoms[b] for b in patho_symptoms[a]]
            )
            for a in patho_symptoms.keys()
        }

        # load and check symptoms and pathology
        self._load_and_check_symptoms_with_pathos(
            args.evi_meta_path,
            self.unique_symptoms,
            self.symptoms_with_multiple_answers,
            args.patho_meta_path,
            self.unique_pathos,
            patho_symptoms,
        )

        # number of demographics features (age, sex, race, ethnic)
        # the first 4 entries corespond respectively to:
        #    Age[0-7], Sex[0-1], Race[0-4], Ethnicity[0-1]
        self.num_demo_features = 10
        low_demo_values = [0] * self.num_demo_features
        high_demo_values = [1] * self.num_demo_features
        self.num_age_values = 8
        self.num_sex_values = 2
        self.num_race_values = 0
        self.num_ethnic_values = 0
        if self.include_race_in_state:
            self.num_demo_features += 5
            low_demo_values += [0] * 5
            high_demo_values += [1] * 5
            self.num_race_values = 5
        if self.include_ethnicity_in_state:
            self.num_demo_features += 2
            low_demo_values += [0] * 2
            high_demo_values += [1] * 2
            self.num_ethnic_values = 2
        if self.include_turns_in_state:
            low_demo_values = [0] + low_demo_values
            high_demo_values = [1] + high_demo_values
            self.num_demo_features += 1

        # define the action and observation spaces
        self.num_symptoms = len(self.symptom_index_2_key)
        self.num_pathos = len(self.pathology_index_2_key)
        if not (self.num_symptoms > 0 and self.num_pathos > 0):
            raise ValueError(
                "Either the number of symptoms or the number of pathologies is null."
            )
        self._define_action_and_observation_spaces(
            self.num_symptoms,
            self.num_pathos,
            self.num_demo_features,
            low_demo_values,
            high_demo_values,
            np.float32,
        )

        # turns in the interaction process (dialog)
        self.turns = 0

        self.context_size = (
            self.num_age_values
            + self.num_sex_values
            + self.num_race_values
            + self.num_ethnic_values
            + (1 if self.include_turns_in_state else 0)
        )
        self.sample_size = len(rb)
        self.idx = 0
        self.indexes = np.arange(self.sample_size)
        self.diag_size = self.num_pathos
        self.symptom_size = self.num_symptoms
        self.state_size = self.num_features - len(high_demo_values) + self.context_size
        self.cached_patients = {}
        self.cost = 1 * np.ones(self.symptom_size)
        self.earn = 1 * np.ones(self.symptom_size)
        self.action_mask = np.zeros((self.num_symptoms, self.state_size))
        self.symptom_mask = np.zeros((1, self.num_symptoms))
        self.atcd_mask = np.zeros((1, self.num_symptoms))
        for symptom_index in range(self.num_symptoms):
            start_idx = self.symptom_to_obs_mapping[symptom_index][0]
            end_idx = self.symptom_to_obs_mapping[symptom_index][1]
            self.action_mask[symptom_index, start_idx:end_idx] = 1
            is_atcd = self.symptom_data[self.symptom_index_2_key[symptom_index]].get("is_antecedent", False)
            if is_atcd:
                self.atcd_mask[0, symptom_index] = 1
            else:
                self.symptom_mask[0, symptom_index] = 1
        self.severity_mask = np.zeros((self.num_pathos,))
        severity_threshold = 3
        for patho_index in range(self.num_pathos):
            if self.pathology_data[self.pathology_index_2_key[patho_index]].get("urgence", severity_threshold) < severity_threshold:
                self.severity_mask[patho_index] = 1
        self._put_patients_data_in_cache(rb)


    

感谢您分享 env.py 文件。它似乎是一个包含环境类的模块，它模拟 RL 项目的患者交互。以下是文件内容的细分： 导入语句： 导入随机模块以生成随机值。 复制模块被导入用于创建对象的副本。 引入了 tqdm 模块，用于在迭代期间显示进度条。 导入 numpy 模块用于数值运算。 scipy.stats 模块被导入用于熵计算。 导入了 sim_utils.py 中的各种实用函数。 常量： NONE_VAL、PRES_VAL 和 ABS_VAL 是常量，表示用于指示症状存在或不存在的值。 类别定义：环境 此类代表患者交互的模拟器。 构造函数 (__init__) 使用提供的参数和文件路径初始化模拟器。 该类具有各种属性来存储配置设置和患者数据。 类方法实现加载和检查数据、定义操作和观察空间、更新患者数据以及模拟患者交互等功能。 患者资料： 使用 sim_utils.py 中的 load_csv 函数从提供的 CSV 文件加载患者数据。 加载的数据包括有关症状、病理、种族、种族、性别和其他元数据的信息。 数据被处理并存储在环境类中的适当变量中。 行动和观察空间： num_symptoms 和 num_pathos 属性分别存储独特症状和病理的数量。 类方法 _define_action_and_observation_spaces 负责根据症状、病理和人口特征的数量定义动作和观察空间。 模拟器功能： 环境类提供了模拟患者交互的方法。 这些方法包括更新患者数据、生成观察结果、选择动作、计算奖励以及跟踪交互进度。 如果您需要进一步说明，或者您想更详细地探索代码的特定部分，请告诉我。

In [None]:
    def reset(self):
        self.idx = 0
        self.turns = 0
        if self.train:
            np.random.shuffle(self.indexes)
        

    def _convert_to_aarlc_format(self, rb_data):
        age = rb_data["AGE_BEGIN"]
        race = self.unique_races[rb_data["RACE"]]
        sex = self.unique_genders[rb_data["GENDER"]]
        ethnic = self.unique_ethnics[rb_data["ETHNICITY"]]
        pathology = self.unique_pathos[rb_data["PATHOLOGY"]]
        pathology_index = self.pathology_name_2_index[pathology]
        pathology_severity = self.pathology_severity_data[pathology_index]
        symptoms = [self.unique_symptoms[a] for a in rb_data["SYMPTOMS"]]
        initial_symptom = (
            None
            if len(self.unique_init_symptoms) == 0
            else self.unique_init_symptoms[rb_data["INITIAL_SYMPTOM"]]
        )
        differential_data = (
            None
            if (self.max_differential_len == -1 or not self.use_differential_diagnosis)
            else {
                self.pathology_name_2_index[
                    self.unique_differential_pathos[int(diff_data[0])]
                ]: [
                    diff_data[1],
                    diff_data[2],
                ]
                for diff_data in rb_data["DIFFERNTIAL_DIAGNOSIS"]
                if self.unique_differential_pathos[int(diff_data[0])]
                in self.pathology_name_2_index
            }
        )
        out_diff = self._compute_differential_probs(differential_data)
        differential_indices, differential_probas = out_diff
        target_state = np.ones((self.state_size), dtype=self.obs_dtype) * ABS_VAL
        if self.include_turns_in_state:
            target_state[0] = 0
        target_state = self._init_demo_features(target_state, age, sex, race, ethnic)
        binary_symptoms, present_symptoms, target_state = self.parse_target_patients(
                symptoms, target_state
        )
        result = {}
        result['bin_sym'] = binary_symptoms
        result['pres_sym'] = present_symptoms
        result['age'] = age
        result['race'] = race
        result['sex'] = sex
        result['ethnic'] = ethnic
        result['pathology_index'] = pathology_index
        result['pathology_severity'] = pathology_severity
        result['initial_symptom'] = self.symptom_name_2_index[initial_symptom]
        result['tgt_state'] = target_state
        result['differential_indices'] = differential_indices
        result['differential_probas'] = differential_probas
        return result
        



您提供的代码片段在环境类中包含以下方法： reset(self)：此方法用于将环境重置为其初始状态。它设置索引 (idx) 并将转 (turns) 为 0。如果环境处于训练模式，它会打乱患者数据的索引。 _convert_to_aarlc_format(self, rb_data)：此方法将患者数据转换为 AaRLC（控制中的自适应和自主强化学习）框架中使用的格式。它采用包含患者信息的字典 rb_data 并将其转换为 AaRLC 格式的字典。 以下是转换过程的细分： 从 rb_data 中提取年龄、种族、性别、民族、病理和病理严重程度。 从 rb_data 中检索症状并将它们转换为相应的索引。 处理初始症状（如果有）。 处理鉴别诊断数据，将其转换为适当的格式。 初始化目标状态并设置人口统计特征、症状和初始症状。 计算目标状态的二进制和当前症状数组。 返回包含 AaRLC 格式的转换后患者信息的字典。 _convert_to_aarlc_format 方法似乎负责为强化学习代理准备患者数据。 如果您有任何进一步的问题或需要额外的解释，请告诉我！

In [None]:
    def _put_patients_data_in_cache(self, rb):
        patients = rb.apply(lambda row_data: self._convert_to_aarlc_format(row_data), axis="columns").to_list()
        self.cached_patients = {idx: patient_data for idx, patient_data in enumerate(patients)}


    def parse_target_patients(self, symptomPat, target_state):
        binary_symptoms = []
        present_symptoms = []
        symptoms_not_listed = set(self.all_symptom_names) - set([self.get_symptom_and_value(symptom_name)[0] for symptom_name in symptomPat])
        considered_symptoms = set(symptomPat + list(symptoms_not_listed))
        for symptom_name in considered_symptoms:
            root_sympt_name, symp_val = self.get_symptom_and_value(symptom_name)
            symptom_index = self.symptom_name_2_index[root_sympt_name]
            symptom_key = self.symptom_index_2_key[symptom_index]
            # is it part of target
            is_present = symptom_name in symptomPat
            data_type = self.symptom_data_types[symptom_index]
            # first symptom should not be an antecedent
            is_antecedent = self.symptom_data[symptom_key].get("is_antecedent", False)
            if data_type == "B" and is_present and (not is_antecedent):
                binary_symptoms.append(symptom_index)

            # use default value if not present
            if data_type != "B" and not is_present:
                symp_val = self.symptom_default_value_mapping.get(symptom_index)

            f_i = self._from_symptom_index_to_frame_index(symptom_index, symp_val)

            if is_present:
                if data_type == "B":
                    present_symptoms.append(symptom_index)
                else:
                    default_value = self.symptom_default_value_mapping[symptom_index]
                    if not (str(default_value) == str(symp_val)):
                        present_symptoms.append(symptom_index)

            if data_type == "B":
                target_state[f_i] = PRES_VAL if is_present else ABS_VAL
            elif data_type == "M":
                target_state[f_i] = PRES_VAL
            else:
                # data_type == "C"
                if not (symptom_index in self.categorical_integer_symptoms):
                    target_state[f_i] = PRES_VAL
                else:
                    val_index = self.symptom_possible_val_mapping[symptom_index][
                        symp_val
                    ]
                    # rescale to 1
                    num = len(self.symptom_possible_val_mapping[symptom_index])
                    scaled = NONE_VAL + ((PRES_VAL - NONE_VAL) * (val_index + 1) / num)
                    target_state[f_i] = scaled

        present_symptoms = list(set(present_symptoms))
        return binary_symptoms, present_symptoms, target_state


    

您提供的代码片段在环境类中包含两个额外的方法： _put_patients_data_in_cache(self, rb)：此方法处理患者数据并将其存储在 cached_pa​​tients 字典中。它将 _convert_to_aarlc_format 方法应用于 rb DataFrame 的每一行，将患者数据转换为 AaRLC 格式。生成的患者数据存储在 cached_pa​​tients 字典中，其中键是索引，值是患者数据。 parse_target_patients(self, symptomPat, target_state)：该方法负责解析目标患者的症状并相应地更新目标状态。它以症状模式列表 (symptomPat) 和目标状态数组 (target_state) 作为输入。 该方法遍历所考虑的症状（symptomPat 中存在或缺失的症状）并对每个症状执行以下步骤： 使用 symptom_name_2_index 和 symptom_index_2_key 映射检索症状索引和键。 确定症状是否存在 (is_present) 在 symptomPat 中。 检查症状的数据类型 (data_type) 以及它是否是前提 (is_antecedent)。 如果数据类型是二进制（“B”）并且症状存在但不是前提，则将其添加到 binary_symptoms 列表中。 如果数据类型不是二进制并且存在症状，则将其添加到 present_symptoms 列表中。 根据症状类型和值更新目标状态数组 (target_state)。 对于二进制症状，如果症状存在，则目标状态设置为 PRES_VAL，否则设置为 ABS_VAL。 对于多值症状，如果存在症状，则目标状态设置为 PRES_VAL。 对于分类症状，目标状态是根据症状值设置的，重新调整为 NONE_VAL 和 PRES_VAL 之间的范围。 该方法返回 binary_symptoms、present_symptoms 和更新的 target_state。 这些方法涉及处理和组织患者数据，为与强化学习代理交互做准备。 如果您有任何进一步的问题或需要额外的解释，请告诉我

In [None]:
def initialize_state(self, batch_size):
        self.batch_size = batch_size
        self.batch_index = self.indexes[self.idx : self.idx+batch_size]
        self.idx += batch_size
        self.disease = []
        self.differential_indices = []
        self.differential_probas = []
        self.disease_severity = []
        self.pos_sym = []
        self.acquired_sym = []
        
        i = 0
        init_state = np.ones((batch_size, self.state_size), dtype=self.obs_dtype) * NONE_VAL
        self.target_state = np.ones((batch_size, self.state_size), dtype=self.obs_dtype) * ABS_VAL
        self.all_state = np.zeros((batch_size, self.symptom_size))
        self.target_differential = np.zeros((batch_size, self.diag_size))
        self.inquired_symptoms = np.zeros((batch_size, self.symptom_size))

        if self.include_turns_in_state:
            # normalize number of turns
            init_state[:, 0] = 0
            self.target_state[:, 0] = 0

        for item in self.batch_index:
            age = self.cached_patients[item]['age']
            race = self.cached_patients[item]['race']
            sex = self.cached_patients[item]['sex']
            ethnic = self.cached_patients[item]['ethnic']
            initial_symptom = self.cached_patients[item]['initial_symptom']
            pathology_index = self.cached_patients[item]['pathology_index']
            pathology_severity = self.cached_patients[item]['pathology_severity']
            differential_indices = copy.deepcopy(self.cached_patients[item]['differential_indices'])
            differential_probas = copy.deepcopy(self.cached_patients[item]['differential_probas'])

            binary_symptoms = self.cached_patients[item]['bin_sym']
            present_symptoms = self.cached_patients[item]['pres_sym']
            self.target_state[i, :] = self.cached_patients[item]['tgt_state'][:]

            init_state[i, :] = self._init_demo_features(init_state[i, :], age, sex, race, ethnic)
            self.disease.append(pathology_index)
            self.disease_severity.append(pathology_severity)
            self.differential_indices.append(differential_indices)
            self.differential_probas.append(differential_probas)

            # reset binary symptoms if initial symptom is from the dataset
            binary_symptoms = (
                [initial_symptom]
                if initial_symptom is not None and self.use_initial_symptom_flag
                else binary_symptoms
            )

            # only select as first indicator binary symptoms
            assert len(binary_symptoms) > 0
            first_symptom = random.choice(binary_symptoms)

            index_first_symptom = first_symptom
            first_action = self._from_symptom_index_to_inquiry_action(index_first_symptom)
            frame_index, _ = self._from_inquiry_action_to_frame_index(first_action)
            init_state[i, frame_index] = PRES_VAL

            self.all_state[i, present_symptoms] = 1
            self.inquired_symptoms[i, index_first_symptom] = 1
            if (differential_indices is not None) and (differential_probas is not None):
                self.target_differential[i, differential_indices[differential_indices != -1]] = differential_probas[differential_indices != -1]
            else:
                self.target_differential[i, pathology_index] = 1.0
            
            i += 1

        self.disease = np.array(self.disease)
        self.disease_severity = np.array(self.disease_severity)
        if len(self.differential_indices) > 0 and self.differential_indices[0] is None:
            self.differential_indices = None
            self.differential_probas = None
        else:
            self.differential_indices = np.array(self.differential_indices)
            self.differential_probas = np.array(self.differential_probas)

        return init_state, self.disease, self.differential_indices, self.differential_probas, self.disease_severity




您提供的代码片段包括环境类中的 initialize_state 方法。该方法负责为一批患者初始化环境状态。 以下是该方法的细分： 初始化变量：初始化各种变量和数组来存储该批次患者的状态信息，包括self.batch_size、self.batch_index、self.disease、self.differential_indices、self.differential_probas、self.disease_severity、self.pos_sym、self. acquired_sym、init_state、self.target_state、self.all_state、self.target_differential 和 self.inquired_symptoms。 迭代批次索引：迭代批次索引中的每个项目以处理患者数据。 检索患者数据：从 cached_pa​​tients 字典中检索 batch 索引中当前项的相关患者数据，包括 age、race、sex、ethnic、initial_symptom、pathology_index、pathology_severity、differential_indices、differential_probas、binary_symptoms、present_symptoms 和 target_state。 更新状态数组：根据检索到的患者数据更新 init_state、self.target_state、self.all_state、self.inquired_symptoms 和 self.target_differential 数组。 选择第一个症状：从二元症状列表中选择第一个症状作为第一个指标。更新 init_state 数组以指示此症状的存在。 更新状态变量：使用当前患者的相应值更新状态变量（self.disease、self.disease_severity、self.differential_indices、self.differential_probas）。 将变量转换为 NumPy 数组：将 self.disease、self.disease_severity、self.differential_indices 和 self.differential_probas 列表转换为 NumPy 数组（如果它们包含有效值）。 返回初始化状态：返回初始化状态变量（init_state、self.disease、self.differential_indices、self.differential_probas、self.disease_severity）作为方法的输出。 initialize_state 方法为一批患者准备环境的初始状态，考虑他们的人口统计信息、症状和鉴别诊断数据。 如果您有任何进一步的问题或需要额外的解释，请告诉我！

In [None]:
    def step(self, s, a_p, done, right_diagnosis, agent, ent_init, threshold, ent):

        s_ = copy.deepcopy(s)
        ent_ = copy.deepcopy(ent)
        s_[~done] = (1 - self.action_mask[a_p[~done]]) * s_[~done] + self.action_mask[a_p[~done]] * self.target_state[~done]
        self.turns += 1
        if self.include_turns_in_state:
            # normalize number of turns
            s_[~done, 0] = self.turns / self.max_turns

        a_d_, p_d_ = agent.choose_diagnosis(s_)

        ent_[~done] = entropy(p_d_[~done], axis = 1)
        ent_ratio = (ent-ent_) / ent_init

        diag = (ent_ < threshold[a_d_]) & (~done)
        right_diag = (a_d_ == np.array(self.disease)) & diag 

        reward_s = self.args.mu * self.reward_func(s[:,self.context_size:], s_[:,self.context_size:], diag, a_p) 
        reward_s[ent_ratio > 0] += (self.args.nu * ent_ratio[ent_ratio > 0])
        reward_s[diag] -= (self.args.mu * 1)
        reward_s[right_diag] += (self.args.mu * 2)
        reward_s[done] = 0

        self.inquired_symptoms[~done, a_p[~done]] = 1
        
        done += diag
        right_diagnosis += right_diag
        
        return s_, reward_s, done, right_diagnosis, diag, ent_, a_d_
    
    def reward_func(self, s, s_, diag, a_p):
        
        reward = -self.cost[a_p]
        already_inquired_action = self.inquired_symptoms[range(self.inquired_symptoms.shape[0]), a_p]
        unrepeated_action = (1 - already_inquired_action)
        reward += unrepeated_action * self.cost[a_p] * 0.7
        positive_action = self.all_state[range(self.all_state.shape[0]), a_p]
        unrepeated_positive_action = positive_action * unrepeated_action
        reward += unrepeated_positive_action * self.earn[a_p]

        return reward


    

您提供的代码片段包括环境类中的 step 方法和 reward_func 方法。 step 方法负责在给定动作的环境中迈出一步并更新状态，而 reward_func 方法计算给定状态转换的奖励。 下面是step方法的解释： 创建变量副本：创建输入状态 s 和熵值 ent 的深层副本，以防止它们被修改。 更新状态：通过应用动作掩码来更新未完成患者的状态 s_ 以保留不允许的动作的现有状态值，并将状态值替换为所选动作的目标状态值 (a_p)。此步骤确保仅针对未完成的患者更新状态并遵守操作掩码。 增加转数：将转数计数器 self.turns 加 1。 归一化状态中的转数：如果环境包括状态中的转数（self.include_turns_in_state 为真），通过将 self.turns 除以 self.max_turns 来归一化状态中的转数。 选择诊断动作：调用agent的choose_diagnosis方法得到诊断动作a_d_和对应的概率p_d_。 计算熵：根据概率p_d_计算未完成患者的熵ent_。 计算熵比：将熵比 ent_ratio 计算为初始熵 (ent_init) 与更新熵 (ent_) 之差除以 ent_init。 确定诊断：确定熵低于阈值 (threshold[a_d_]) 和未完成的患者 (diag)。 确定正确的诊断：确定诊断操作 (a_d_) 与基本事实诊断 (np.array(self.disease)) 匹配的患者 (right_diag) 以及诊断 (diag) 的一部分。 计算奖励：根据奖励函数reward_func和输入s、s_、diag、a_p计算奖励reward_s。奖励函数考虑了行动的成本 (self.cost[a_p])、积极行动的奖励 (self.earn[a_p]) 以及对重复或已经查询的行动的惩罚。 更新查询症状：通过将所选操作 (a_p) 标记为未完成患者的查询来更新 self.inquired_symptoms 数组。 更新 done 和 right 诊断计数器：通过分别递增 diag 和 right_diag 中患者的值来更新 done 和 right_diagnosis 数组。 返回更新的状态、奖励、完成、正确的诊断、诊断标志（diag）、更新的熵（ent_）和诊断动作（a_d_）。 reward_func 方法根据提供的输入计算给定状态转换的奖励。它考虑了行动的成本、积极行动的奖励以及重复或已经询问的行动的惩罚。奖励是通过减去动作成本 (self.cost[a_p]) 并加上未重复或已查询的积极动作的奖励 (self.earn[a_p]) 来计算的。 如果您有任何进一步的问题或需要额外的解释，请告诉我！

In [None]:
    def _define_action_and_observation_spaces(
        self,
        num_symptoms,
        num_pathos,
        num_demo_features,
        low_demo_values,
        high_demo_values,
        obs_dtype,
    ):
        """ Utility function for defining the enviroment action and observation spaces.

        It define the action and observation spaces for this Gym environment.

        Parameters
        ----------
        num_symptoms: int
            number of possible symptoms.
        num_pathos: int
            number of possible pathos.
        num_demo_features: int
            number of features corresponditing to demographic data.
        low_demo_values: list
            low values for demographic features.
        high_demo_values: list
            high values for demographic features.
        obs_dtype: dtype
            dtype of the observation data.

        Returns
        -------
        None
        """

        if self._has_int_action():
            num_actions = num_symptoms + num_pathos
            self.num_actions = [num_actions]
        else:
            self.num_actions = [2, num_symptoms, num_pathos]

        msg = "the length of low/high_demo_values must match num_demo_features."
        assert len(low_demo_values) == len(high_demo_values) == num_demo_features, msg

        # low and high values of each entry of the observation/state space
        low_val = list(low_demo_values)
        high_val = list(high_demo_values)

        # low and high values of each entry of the observation/state space
        # dedicated to symptoms
        symp_low_val = []
        symp_high_val = []

        # mapping the symptom index to the [start, end] indices in the obs data
        symptom_to_obs_mapping = {}

        # mapping the symptom index to possible values
        symptom_possible_val_mapping = {}

        # mapping the symptom index to the symptom types
        symptom_data_types = {}

        # mapping the symptom index to the symptom default value
        symptom_default_value_mapping = {}

        # integer based categorical value
        categorical_integer_symptoms = set()

        for idx in range(len(self.symptom_index_2_key)):
            key = self.symptom_index_2_key[idx]
            data_type = self.symptom_data[key].get("type-donnes", "B")
            possible_values = self.symptom_data[key].get("possible-values", [])
            default_value = self.symptom_data[key].get("default_value", None)
            start_obs_idx = len(symp_low_val) + num_demo_features
            symptom_data_types[idx] = data_type
            num_elts = len(possible_values)
            if num_elts > 0:
                assert default_value in possible_values

            if data_type == "B":
                # binary symptom
                symp_low_val.append(min(NONE_VAL, PRES_VAL, ABS_VAL))
                symp_high_val.append(max(NONE_VAL, PRES_VAL, ABS_VAL))
                symptom_to_obs_mapping[idx] = [start_obs_idx, start_obs_idx + 1]
            elif data_type == "C":
                # categorical symptom
                assert num_elts > 0
                if isinstance(possible_values[0], str):
                    for k in range(num_elts):
                        symp_low_val.append(min(NONE_VAL, PRES_VAL, ABS_VAL))
                        symp_high_val.append(max(NONE_VAL, PRES_VAL, ABS_VAL))
                    symptom_to_obs_mapping[idx] = [
                        start_obs_idx,
                        start_obs_idx + num_elts,
                    ]
                else:
                    # integer value
                    categorical_integer_symptoms.add(idx)
                    symp_low_val.append(min(NONE_VAL, PRES_VAL))
                    symp_high_val.append(max(NONE_VAL, PRES_VAL))
                    symptom_to_obs_mapping[idx] = [start_obs_idx, start_obs_idx + 1]
                symptom_possible_val_mapping[idx] = {
                    a: i for i, a in enumerate(possible_values)
                }
                symptom_default_value_mapping[idx] = default_value
            elif data_type == "M":
                # multi-choice symptom
                assert num_elts > 0
                for k in range(num_elts):
                    symp_low_val.append(min(NONE_VAL, PRES_VAL, ABS_VAL))
                    symp_high_val.append(max(NONE_VAL, PRES_VAL, ABS_VAL))
                symptom_to_obs_mapping[idx] = [start_obs_idx, start_obs_idx + num_elts]
                symptom_possible_val_mapping[idx] = {
                    a: i for i, a in enumerate(possible_values)
                }
                symptom_default_value_mapping[idx] = default_value
            else:
                raise ValueError(
                    f"Symptom key: {key} - Unknown data type: {data_type}."
                )

        low_val.extend(symp_low_val)
        high_val.extend(symp_high_val)
        self.obs_dtype = obs_dtype
        self.num_features = len(low_val)
        self.symptom_to_obs_mapping = symptom_to_obs_mapping
        self.symptom_possible_val_mapping = symptom_possible_val_mapping
        self.symptom_data_types = symptom_data_types
        self.symptom_default_value_mapping = symptom_default_value_mapping
        self.categorical_integer_symptoms = categorical_integer_symptoms

    

_define_action_and_observation_spaces 方法是环境类中的一个实用函数。它负责定义环境的动作和观察空间。下面是函数的解释： 确定动作数量：如果环境具有整数动作空间，则动作数量设置为症状数量和病理数量之和 (num_symptoms + num_pathos)。否则，将操作数设置为包含 2（用于在症状和病理之间进行选择）、症状数和病理数（[2, num_symptoms, num_pathos]）的列表。 验证输入：检查 low_demo_values 和 high_demo_values 的长度是否与人口特征的数量 (num_demo_features) 匹配。如果它们不匹配，则会引发断言错误。 定义低值和高值：创建列表 low_val 和 high_val 以存储观察/状态空间的每个条目的低值和高值。分别使用 low_demo_values 和 high_demo_values 中的值初始化它们。 定义特定于症状的值：初始化空列表 symp_low_val 和 symp_high_val 以存储专门针对症状的低值和高值。这些列表将附加基于症状数据类型的值。 定义症状索引的映射：初始化字典 symptom_to_obs_mapping、symptom_possible_val_mapping、symptom_data_types 和 symptom_default_value_mapping 以存储每个症状索引的映射。这些映射将用于确定每个症状的相应观察指标、可能值、数据类型和默认值。 确定数据类型并处理不同的情况：遍历症状索引并获得相应的症状数据。确定症状的数据类型 (data_type) 和可能的值 (possible_values)。处理不同的数据类型如下： 二元症状 (data_type == "B")：分别将最小值和最大值 (min(NONE_VAL, PRES_VAL, ABS_VAL)) 附加到 symp_low_val 和 symp_high_val。使用症状的观察指数更新 symptom_to_obs_mapping。 分类症状 (data_type == "C")：检查可能值的数量 (num_elts) 是否大于 0。如果可能值是字符串，则将每个可能值的最小值和最大值分别附加到 symp_low_val 和 symp_high_val。如果可能的值为整数，则将最小值和最大值 (min(NONE_VAL, PRES_VAL)) 分别附加到 symp_low_val 和 symp_high_val。使用症状的观察指数更新 symptom_to_obs_mapping。将可能值和默认值分别存储在 symptom_possible_val_mapping 和 symptom_default_value_mapping 字典中。 多选症状（data_type == "M"）：将多选症状的每个选择的最小值和最大值分别附加到 symp_low_val 和 symp_high_val。使用症状的观察指数更新 symptom_to_obs_mapping。将可能值和默认值分别存储在 symptom_possible_val_mapping 和 symptom_default_value_mapping 字典中。 更新低值和高值：分别使用来自 symp_low_val 和 symp_high_val 的值扩展 low_val 和 high_val 列表。这些列表现在代表观察/状态空间的每个条目的低值和高值，包括人口统计特征和症状。存储附加信息：存储观测数据类型（obs_dtype）、特征数量（num_features）、症状到观测映射（symptom_to_obs_mapping）、症状可能值映射（symptom_possible_val_mapping）、症状数据类型（symptom_data_types）、症状默认值映射 (symptom_default_value_mapping)，以及具有分类整数值 (categorical_integer_symptoms) 作为环境属性的症状指数集。 总体而言，此效用函数有助于根据提供的参数和症状数据定义环境的操作和观察空间。 如果您有任何其他问题，请随时提问！

In [None]:
    def get_symptom_to_observation_mapping(self):
        """Utility to get the index range in the state space associated to each symptom.

        Utility function to get the mapping from all symptom indices to index ranges
        associated to those symptoms in the observation space.

        Parameters
        ----------

        Returns
        -------
        result: dict
            the mapping from symptom index to index range associated to
            that symptom in the observation space.

        """
        return self.symptom_to_obs_mapping

    def get_pathology_severity(self):
        """Utility to get the severity associated to each pathology.

        Parameters
        ----------

        Returns
        -------
        result: list
            the severity associated to each pathology. The pathologies are
            identified by their index in the list.

        """
        return self.pathology_severity_data

    def get_evidence_default_value_in_obs(self):
        """Utility to get the evidence default value in observation frame.

        Parameters
        ----------

        Returns
        -------
        result: list of tuple (pos, value) where the ith entru correspond
            to the position and the value in the observation frame informing
            that the ith symptom is missing.

        """
        return self.symptom_defaul_in_obs

    def _has_int_action(self):
        """Utility to check if the actions are of type int or tuple of int.

        Parameters
        ----------

        Returns
        -------
        result: bool
            True if actions are of type int.

        """
        return self.action_type == 0

    def _has_multi_choice_symptoms(self, symptom_data):
        """Utility to check if the simulator support multi-choice symptoms.

        It checks if the the provided symptom data contains
        symptoms associated with multi-choice answers.
        E.g.: `douleurxx_endroits` which coresspond to the body parts
        where you have pain.

        Parameters
        ----------
        symptom_data: dict
            the json dict describing the symptom data provided to the
            simulator.

        Returns
        -------
        result: bool
            True if the symptom data contains multi-choice symptoms.

        """
        for k in symptom_data.keys():
            if symptom_data[k].get("type-donnes", "B") == "M":
                return True
        return False

    def get_hierarchical_symptom_dependency(self):
        """Get the groups of symptoms that depend on some master symptoms.

        Returns
        -------
        result: dict
            dictionnary of the groups of dependent symptom indices. The key of
            this dictionary is the index of the master symptom.
        """
        return self.all_linked_symptom_indices

    def _get_linked_symptoms(self, symptom_data):
        """Get the groups of symptoms that are linked together.

        Symptoms are linked together if they share the same code_question.

        Parameters
        ----------
        symptom_data: dict
            dictionary representing the loaded symptom JSON file.

        Returns
        -------
        result: dict
            dictionnary of the groups of linked symptoms. The key of
            this dictionary is the base symptom.

        """
        result = {}
        for k in symptom_data.keys():
            code_question = symptom_data[k].get("code_question", k)
            if code_question not in result:
                result[code_question] = []
            result[code_question].append(k)

        # Eliminate entries with just one element in the list as
        # those are independent questions.
        # Also, retrieve the base question (it is the one equals to code_question)
        # and eliminate it from the related list (it already serves as key)
        all_keys = list(result.keys())
        for k in all_keys:
            if len(result[k]) == 1:
                result.pop(k)
            else:
                assert k in result[k]
                result[k].remove(k)

        # return the computed map
        return result

   

提供的代码包括环境类中的几个实用函数。下面是对每个函数的解释： get_symptom_to_observation_mapping：此函数返回从每个症状索引到观察空间中与该症状关联的索引范围的映射。它检索环境的 symptom_to_obs_mapping 属性。 get_pathology_severity：此函数返回与每个病理相关的严重性值列表。病理由列表中的索引标识。它检索环境的 pathology_severity_data 属性。 get_evidence_default_value_in_obs：此函数返回元组列表，其中每个元组包含观察框中指示缺失症状的位置和值。它检索环境的 symptom_defaul_in_obs 属性。 _has_int_action：此实用函数检查环境中的操作是否为 int 类型。如果操作类型为 0，则返回 True。 _has_multi_choice_symptoms：此实用函数检查提供给模拟器的症状数据是否包含与多选答案相关的症状。它遍历症状数据并在遇到数据类型为“M”（多选）的症状时返回 True。 get_hierarchical_symptom_dependency：此函数返回依赖于某些主要症状的症状组的字典。字典的关键字是主症候群的索引。它检索环境的 all_linked_symptom_indices 属性。 _get_linked_symptoms：此实用程序函数检索基于共享相同 code_question 链接在一起的症状组。它将症状数据作为输入并返回一个字典，其中键是基本症状，值是相关症状的列表。独立问题和基本问题本身将从结果中消除。 这些实用函数提供了方便的方法来访问和检索与环境相关的各种信息，例如映射、严重性数据、默认值和症状依赖性。 如果您还有其他问题，请随时提问！

In [None]:
 def _load_and_check_symptoms_with_pathos(
        self,
        symptom_filepath,
        unique_symptoms,
        symptoms_with_multiple_answers,
        condition_filepath,
        unique_pathos,
        patho_symptoms,
    ):
        """Check symptom/condition JSON file validity against the provided patient file.

        It loads the symptom/pathology data and check
        if they are compliant with the ones defined
        in the `unique_symptoms`/`unique_pathos` provided
        in the patient file.

        Parameters
        ----------
        symptom_filepath: str
            path to a json file containing the symptom data.
        unique_symptoms: list
            a list of unique symptoms within the provided patient data.
        symptoms_with_multiple_answers: list
            a list of unique symptoms with multiple answer within the provided
            patient data.
        condition_filepath: str
            path to a json file containing the pathology data.
        unique_pathos: list
            a list of unique pathologies within the provided patient data.
        patho_symptoms: dict
            a mapping from a pathology to a set of symptoms describing that
            pathology as derived from the patient data.

        Returns
        -------
        None

        """

        # load symptoms
        symptom_infos = load_and_check_data(
            symptom_filepath, unique_symptoms, key_name="name"
        )
        self.symptom_index_2_key = symptom_infos[0]
        self.symptom_name_2_index = symptom_infos[1]
        self.symptom_data = symptom_infos[2]
        self.multi_choice_flag = self._has_multi_choice_symptoms(self.symptom_data)
        self.all_symptom_names = [
            self.symptom_data[k]["name"] for k in self.symptom_data.keys()
        ]
        self.all_linked_symptoms = self._get_linked_symptoms(self.symptom_data)
        # transform 'self.all_linked_symptoms' from str into symptom indices
        self.all_linked_symptom_indices = {
            self.symptom_name_2_index[self.symptom_data[base_symp_key]["name"]]: [
                self.symptom_name_2_index[self.symptom_data[linked_symp_key]["name"]]
                for linked_symp_key in self.all_linked_symptoms[base_symp_key]
            ]
            for base_symp_key in self.all_linked_symptoms.keys()
        }
        # reverse the linked symptoms map: from linked_symptom => base_symptom
        self.all_linked_reverse_symptom_indices = {
            linked_symp_idx: base_symp_idx
            for base_symp_idx in self.all_linked_symptom_indices
            for linked_symp_idx in self.all_linked_symptom_indices[base_symp_idx]
        }
        for a in symptoms_with_multiple_answers:
            idx = self.symptom_name_2_index[a]
            key = self.symptom_index_2_key[idx]
            data_type = self.symptom_data[key].get("type-donnes", "B")
            if data_type != "M":
                raise ValueError(
                    f"Unconsistency with Symptom {a}: Occured multiple times while"
                    f" not a multiple choice symptom. {symptoms_with_multiple_answers}"
                )

        # load pathologies
        pathology_infos = load_and_check_data(
            condition_filepath, unique_pathos, key_name="condition_name"
        )
        self.pathology_index_2_key = pathology_infos[0]
        self.pathology_name_2_index = pathology_infos[1]
        self.pathology_data = pathology_infos[2]
        self.pathology_defined_symptoms = {}
        # get all pathology severity - # default severity to 0
        self.pathology_severity_data = [
            self.pathology_data[self.pathology_index_2_key[idx]].get("urgence", 0)
            for idx in range(len(self.pathology_index_2_key))
        ]

        # check if the provided df respect the symptom/patho relationships
        for key in self.pathology_data.keys():
            defined_symptom_keys = list(self.pathology_data[key]["symptoms"].keys())
            if "antecedents" in self.pathology_data[key]:
                defined_symptom_keys += list(
                    self.pathology_data[key]["antecedents"].keys()
                )
            defined_symptoms = []
            for k in defined_symptom_keys:
                symp_name = self.symptom_data[k]["name"]
                symp_type = self.symptom_data[k].get("type-donnes", "B")
                # binary symptoms
                if symp_type == "B":
                    defined_symptoms.append(symp_name)
                else:
                    # categorical or multi-choice
                    possible_values = self.symptom_data[k].get("possible-values", [])
                    for v in possible_values:
                        val_name = symp_name + "_@_" + str(v)
                        defined_symptoms.append(val_name)
            defined_symptoms = set(defined_symptoms)
            self.pathology_defined_symptoms[key] = defined_symptoms
            patho = self.pathology_data[key]["condition_name"]

            if patho in patho_symptoms:
                data_symptoms = patho_symptoms[patho]
                diff = data_symptoms - defined_symptoms
                if len(diff) > 0:
                    raise ValueError(
                        f"Unconsistency with patho {patho}: Unauthorized symptoms {diff}"
                    )

   

_load_and_check_symptoms_with_pathos 方法是环境类中的私有方法。它负责根据提供的患者数据加载和检查症状和病理数据的有效性。以下是对该方法中执行的步骤的解释： 加载症状：从symptom_filepath指定的JSON文件中加载症状数据。函数 load_and_check_data 用于根据患者数据中提供的 unique_symptoms 列表验证症状数据。该函数返回三个信息：symptom_index_2_key（症状索引到症状键的映射）、symptom_name_2_index（症状名称到症状索引的映射）、symptom_data（加载的症状数据）。根据加载的数据设置与症状相关的附加属性，例如 multi_choice_flag（表示是否存在多选症状）和 all_symptom_names（所有症状名称的列表）。 处理链接的症状：该方法调用 _get_linked_symptoms 来检索基于相同 code_question 链接在一起的症状组。结果存储在 all_linked_symptoms 中，其中 key 是基本症状，value 是相关症状的列表。然后，该方法通过使用 symptom_name_2_index 将症状名称映射到相应的索引，将 all_linked_symptoms 转换为索引。生成的字典存储在 all_linked_symptom_indices 中。此外，创建反向映射 all_linked_reverse_symptom_indices 以将链接的症状指数映射到它们各自的基本症状指数。 检查多项选择症状：该方法检查在 symptoms_with_multiple_answers 中指定的具有多个答案的症状是否实际上是加载数据中的多项选择症状。它遍历指定的症状，检索它们的索引，并检查症状数据中的 type-donnes 属性。如果该属性不是“M”（表示多选症状），则会引发 ValueError。 加载病理：使用 load_and_check_data 函数从 condition_filepath 指定的 JSON 文件加载病理数据。数据根据患者数据中提供的 unique_pathos 列表进行验证。与症状数据类似，该函数返回三个信息：pathology_index_2_key（病理索引到病理key的映射）、pathology_name_2_index（病理名称到病理索引的映射）和pathology_data（加载的病理数据）。 检查症状-病理关系：该方法遍历加载数据中的每个病理，并检查与病理相关的症状是否与从患者数据中导出的症状相匹配。它从病理数据中检索症状键，包括主要症状和先行症状。然后它会检查症状数据中的症状名称和类型，并为每种病理学构建一组已定义的症状。如果从患者数据（存储在 patho_symptoms 中）派生的症状与定义的病理症状不匹配，则会引发 ValueError。 _load_and_check_symptoms_with_pathos 方法确保加载的症状和病理数据与患者数据一致，检查一致性并验证症状和病理之间的关系。 如果您有任何其他问题，请随时提问！

In [None]:
     def get_symptom_and_value(self, symptom_name):
        """Utility function to get the symptom and the associated value from csv data.

        Given a symptom, find its root and assocaited value
        for example, `douleurxx_carac_@_penible` will return
        `douleurxx_carac` and `penible`.
        Similarly, `fievre` will return `fievre` and  None (which
        is the default value for boolean symptom).

        Parameters
        ----------
        symptom_name: str
            the symptom (from csv) for which we want to retrieve the info data.

        Returns
        -------
        symptom: str
            the symptom name as defined in the config file.
        val: object
            the value associated to the symptom.

        """
        idx = symptom_name.find("_@_")
        if idx == -1:
            # boolean symptom
            return symptom_name, None
        else:
            elem_base = symptom_name[:idx]
            elem_val = symptom_name[idx + 3 :]
            base_idx = self.symptom_name_2_index.get(elem_base, -1)

            assert base_idx != -1, (
                f"The symptom {elem_base} is not defined "
                f"while receiving {symptom_name}!"
            )

            assert self.symptom_possible_val_mapping.get(base_idx)
            if not (elem_val in self.symptom_possible_val_mapping[base_idx]):
                # convert to the right type
                elem_val = int(elem_val)
                assert elem_val in self.symptom_possible_val_mapping[base_idx]

            return elem_base, elem_val

    def _init_demo_features(self, frame, age, sex, race, ethnic):
        """Set the demographic features in the provided observation frame.

        Parameters
        ----------
        frame: np.array
            the observation frame to be updated.
        age: int
            the age of the patient.
        sex: str
            the sex of the patient.
        race: str
            the race of the patient.
        ethnic: str
            the ethnic of the patient.

        Returns
        -------
        result: np.array
            the updated observation frame.

        """
        tmp_init_idx = 1 if self.include_turns_in_state else 0

        # set the age
        frame[tmp_init_idx + encode_age(age)] = 1
        # set the sex
        frame[tmp_init_idx + self.num_age_values + encode_sex(sex)] = 1
        # set the race
        if self.include_race_in_state:
            val = self.num_age_values + self.num_sex_values
            frame[tmp_init_idx + val + encode_race(race)] = 1
        # set the ethnicity
        if self.include_ethnicity_in_state:
            val = self.num_age_values + self.num_sex_values + self.num_race_values
            frame[tmp_init_idx + val + encode_ethnicity(ethnic)] = 1
        # return the updated frame
        return frame

   

get_symptom_and_value 方法是环境类中的一个实用函数。它负责从给定的症状名称中提取症状及其相关值。以下是此方法如何工作的解释： 该方法接收 symptom_name 作为输入。 它在 symptom_name 中搜索子字符串“_@_”。如果没有找到，则表明该症状是一个布尔症状，该方法返回 symptom_name 本身和 None 作为关联值。 如果在 symptom_name 中找到“_@_”，它将把 symptom_name 分成两部分：elem_base 和 elem_val。 elem_base 表示根症状名称，elem_val 表示关联值。 它通过在 symptom_name_2_index 字典中查找来检查 elem_base 是否是有效症状。如果在字典中找不到 elem_base，则会引发 ValueError。 它通过验证 elem_val 是否存在于相应基本索引的 symptom_possible_val_mapping 字典中来检查 elem_val 是否为 elem_base 症状的有效值。如果在字典中找不到 elem_val，它会假定该值是一个整数并尝试转换它。如果仍未在字典中找到转换后的值，则会引发 ValueError。 最后，它返回 elem_base 作为症状，并返回 elem_val 作为关联值。 _init_demo_features 方法是环境类中的另一个实用函数。它用于在提供的观察框架中设置人口统计特征（年龄、性别、种族和民族）。以下是此方法如何工作的解释： 该方法接收框架（观察框架）、年龄、性别、种族和民族作为输入。 它根据 include_turns_in_state 是 True 还是 False 来初始化 tmp_init_idx。该变量用于确定在观察框架中设置人口统计特征的起始索引。 它使用 encode_age 函数对年龄进行编码，并将帧中的相应索引设置为 1。 它使用 encode_sex 函数对性别进行编码，并将帧中的相应索引设置为 1。 如果 include_race_in_state 为 True，它使用 encode_race 函数对比赛进行编码，并将帧中的相应索引设置为 1。 如果 include_ethnicity_in_state 为 True，它使用 encode_ethnicity 函数对种族进行编码，并将帧中的相应索引设置为 1。 最后，它返回更新后的帧。 这些效用函数用于从给定的症状名称中提取症状值，并在观察框架中设置人口统计特征。 如果您还有其他问题，请随时提问！

In [None]:
     def _compute_differential_probs(self, differential):
        """Compute the differential probability from the diffential scores.

        Parameters
        ----------
        differential: dict
            Map of the pathology id in the differential to its sommeOR and score
            as returned by DXA.

        Returns
        -------
        indices: np.ndarray
            the array correspondind to the pathology indices involved in the
            differential (-1 is used for padding).
        probability: np.ndarray
            the array representing the computed probabilities associated to the
            pathologies represented by `indices`.

        """
        if differential is None or not self.use_differential_diagnosis:
            return None, None
        else:
            assert len(differential) <= self.max_differential_len
            indices = np.ones(self.max_differential_len, dtype=int) * -1
            probability = np.zeros(self.max_differential_len, dtype=np.float32)
            sumProba = 0
            for i, k in enumerate(differential.keys()):
                indices[i] = k
                sommeOR, _ = differential[k]
                proba = sommeOR / (1.0 + sommeOR)
                sumProba += proba
                probability[i] = proba
            if sumProba != 0:
                probability = probability / sumProba
            else:
                probability[0 : len(differential)] = 1.0 / len(differential)

        # sort in desceding order according to proba
        s_ind = np.argsort(probability, axis=-1)
        probability = probability[s_ind[::-1]]
        indices = indices[s_ind[::-1]]

        return indices, probability

    def _from_inquiry_action_to_symptom_index(self, action):
        """Get the index of the symptom corresponding to the inquiry action.

        Parameters
        ----------
        action: int
            the action from which we want to find the corresponding
            symptom index.

        Returns
        -------
        index: int
            index of the symptom associated with the provided action.

        """
        if self._has_int_action():
            symptom_index = action
        else:
            symptom_index = action[1]

        if isinstance(symptom_index, np.ndarray):
            symptom_index = symptom_index.item()

        return symptom_index

   

_compute_differential_probs 方法和_from_inquiry_action_to_symptom_index 方法是类中的两个函数，但它们不是特定类定义的一部分。以下是对每种方法的解释： _compute_differential_probs（自我，微分）： 该方法根据差异分数计算差异概率。 它以字典差异作为输入，将差异中的病理 ID 映射到它们的 sommeOR 和分数。 如果 differential 为 None 或 use_differential_diagnosis 标志为 False，则返回 None，None。 否则，它使用最大差分长度 (self.max_differential_len) 初始化数组索引和概率。 然后它遍历差异字典的键并计算每个病理的概率。 计算出的概率存储在概率数组中，病理指标存储在索引数组中。 通过将概率除以所有概率的总和来对概率进行归一化，除非总和为 0。在这种情况下，将相等的概率分配给所有病理。 最后将probability和indices数组按照概率降序排列，返回排序后的数组。 _from_inquiry_action_to_symptom_index（自我，行动​​）： 该方法用于获取查询动作对应的症状索引。 它需要一个动作作为输入，输入可以是整数或元组，具体取决于动作在模型中是表示为整数还是元组。 如果 self._has_int_action() 为 True，则动作本身被视为症状索引。 如果 self._has_int_action() 为 False，则从动作元组的第二个元素中提取症状索引。 如果症状索引表示为 NumPy 数组，则会将其转换为整数。 返回提取的症状索引。 

In [None]:
 def _from_symptom_index_to_inquiry_action(self, symptom_index):
        """Get the inquiry action corresponding to the provided symptom index.

        Parameters
        ----------
        symptom_index: int
            the index of the symptom from which we want to find
            the corresponding action.

        Returns
        -------
        action: int
            the inquiry action associated with the provided symptom.

        """
        if self._has_int_action():
            return symptom_index
        else:
            return [0, symptom_index, 0]

    def _from_symptom_index_to_frame_index(self, symptom_index, symptom_val=None):
        """Get the frame index corresponding to the symptom_index.

        Parameters
        ----------
        symptom_index: int
            the index of the symptom from which we want to find
            the corresponding action.
        symptom_val: obj
            the value associated to the the symptom indexed by
            the provided index. Default: None

        Returns
        -------
        index: int
            frame index associated with the provided action.

        """
        data_type = self.symptom_data_types[symptom_index]

        if data_type == "B":
            return self.symptom_to_obs_mapping[symptom_index][0]
        elif data_type == "C" and (symptom_index in self.categorical_integer_symptoms):
            return self.symptom_to_obs_mapping[symptom_index][0]
        else:
            if symptom_val is None:
                raise ValueError("The expected value is NOT supposed to be None.")
            idx = self.symptom_possible_val_mapping[symptom_index][symptom_val]
            return self.symptom_to_obs_mapping[symptom_index][0] + idx

    

_from_symptom_index_to_inquiry_action 方法和 _from_symptom_index_to_frame_index 方法是类中的另外两个函数，但它们不是特定类定义的一部分。以下是对每种方法的解释： _from_symptom_index_to_inquiry_action（自我，symptom_index）： 该方法用于获取提供的症状指标对应的查询动作。 它以 symptom_index 作为输入，这是需要相应操作的症状的索引。 如果 self._has_int_action() 为 True，则将 symptom_index 本身视为查询动作并返回。 如果 self._has_int_action() 为 False，则返回列表 [0, symptom_index, 0] 作为查询操作。 _from_symptom_index_to_frame_index（自身，symptom_index，symptom_val=None）： 该方法用于获取symptom_index对应的帧索引。 它以 symptom_index 作为输入，它是需要帧索引的症状的索引。 它还采用可选的 symptom_val 参数，该参数是与由提供的索引索引的症状关联的值。 该方法首先检查由 symptom_index 索引的症状的数据类型。 如果数据类型是“B”（二进制症状）或“C”（分类整数症状），它会从 self.symptom_to_obs_mapping[symptom_index] 返回第一帧索引。 如果数据类型不同且未提供 symptom_val，则会引发 ValueError 指示预期值不应为 None。 如果提供了 symptom_val，它会从 self.symptom_possible_val_mapping[symptom_index] 中检索相应的索引 (idx)。 该方法然后返回来自 self.symptom_to_obs_mapping[symptom_index] 和 idx 的第一帧索引的总和。 

In [None]:
def _from_inquiry_action_to_frame_index(self, action):
        """Get the frame index corresponding to the inquiry action.

        Parameters
        ----------
        action: int
            the action from which we want to find the corresponding
            frame index.

        Returns
        -------
        indices: couple of int
            frame indices interval [start, end] associated with
            the provided action.

        """
        symptom_index = self._from_inquiry_action_to_symptom_index(action)

        start_idx = self.symptom_to_obs_mapping[symptom_index][0]
        end_idx = self.symptom_to_obs_mapping[symptom_index][1]

        return start_idx, end_idx




_from_inquiry_action_to_frame_index 方法用于确定与给定查询操作对应的帧索引。它是这样工作的： action：需要帧索引的查询动作。 该方法执行以下步骤： 调用 _from_inquiry_action_to_symptom_index 方法，将操作作为参数传递。此方法检索与提供的操作关联的症状索引。 从 self.symptom_to_obs_mapping[symptom_index] 检索开始和结束索引。这些索引定义了与提供的操作相关联的帧索引间隔。 返回一个元组 (start_idx, end_idx)，表示与提供的操作相关联的帧索引区间 [start, end]。 总之，该方法充当查询动作与其对应帧索引之间的映射，允许系统根据用户输入识别相关帧。

In [None]:
if __name__ == '__main__':
    class AttrDict(dict):
        def __init__(self, *args, **kwargs):
            super(AttrDict, self).__init__(*args, **kwargs)
            self.__dict__ = self
    
    args = AttrDict({
        'no_initial_evidence': False,
        'MAXSTEP': 30,
        'include_turns_in_state': True,
        'no_differential': False,
        'evi_meta_path': "./release_evidences.json",
        'patho_meta_path': "./release_conditions.json",
        'train': True,
    })
    patient_filepath = "./release_validate_patients.zip"
    np.random.seed(1000)
    random.seed(1000)
    env = environment(args, patient_filepath)

提供的代码设置环境类的实例并使用给定的参数对其进行初始化。这是代码的细分： 定义了 AttrDict 类，它是内置 dict 类的子类。它允许将字典键作为属性访问。 创建了一个名为 args 的 AttrDict 实例，其中包含环境的各种配置设置。 patient_filepath 变量被指定为患者文件的路径。 NumPy 的随机数生成器种子 (np.random.seed(1000)) 和 Python 的内置随机模块 (random.seed(1000)) 被设置为确保随机操作的可重复性。 创建环境类的一个实例，命名为 env，将 args 和 patient_filepath 作为参数。这将使用提供的配置设置和患者数据初始化环境。 此代码片段的目的似乎是为某种医学诊断或模拟系统设置环境。环境类可能封装了系统与用户/患者数据之间交互所需的逻辑和功能。