In [1]:
import os
import re
import warnings
from tqdm import tqdm
import pickle
import numpy as np
import pandas as pd
import xorbits.pandas as xpd
import utils
from utils import get_3periods, parse_man_info, quick_load_liuzhong_health_check_data

%load_ext autoreload
%autoreload 2
warnings.filterwarnings("ignore")


In [2]:
dir_path = "/cluster/home/bqhu_jh/projects/healthman/"

df_merged_pq = xpd.read_parquet(f"{dir_path}/analysis/detail.parquet").rebalance()

if not os.path.isfile(f"{dir_path}/analysis/detail_rebalanced.parquet"):
    os.system(f"mkdir -p {dir_path}/analysis/detail_rebalanced.parquet")
    df_merged_pq.to_parquet(f"{dir_path}/analysis/detail_rebalanced.parquet")

df_merged_pq = xpd.read_parquet(f"{dir_path}/analysis/detail_rebalanced.parquet")



  0%|          |   0.00/100 [00:00<?, ?it/s]

In [3]:
l_cols = ["default_num"]
dict_parse_func = {var: getattr(utils, "parse_%s" % var) for var in l_cols}
def process_line(x):
    var,result = x["item_id"], x["results"]
    if var not in dict_parse_func:
        var = "default_num"

    return dict_parse_func[var](result)

In [4]:
l_high_lighted = [
    "v000760", "v000761", "v002266",
    'v000763', 'v9319', 'v000762', 'v002161',
    'HEART.Normal', 'HEART.SinusRhythm', 'HEART.T_change', 'HEART.SinusBradycardia',
    'HEART.ST_change', 'HEART.SinusArrhythmia', 'HEART.ElectricAxisDeviation', 'HEART.LVH', 'HEART.SinusTachycardia',
    'HEART.AtrialFibrillation', 'HEART.HeartBlocks', 'HEART.ExtraSystole',
    "LUNG.GroundGlassOpacity", "LUNG.Effusion", "LUNG.Fibrosis", "LUNG.Calcification", "LUNG.ProliferativeLesion",
    "LUNG.SolidNodule", "LUNG.SmallNodule", "LUNG.Nodule", "LUNG.Normal",
    'v0009', 'v0008', 'v0010', 'v1235',
    'v0023', 'v1970', 'v1164', 'v9046',
    'v9047', 'v9048', 'v9049', 'v9050',
    'v9057', 'v9064', 'v9045', 'v9058',
    'v9059', 'v9060', 'v9061', 'v9062',
    'v9063', 'v9065', 'v9066', 'v9051', 'v9055',
    'v9089', 'v9090', 'v9091', 'v0017',
    'v0045', 'v0022', 'v1162', 'v1940',
    'v1925', 'LIVER.Steatosis', 'v9453', 'v0955',
    'v9461', 'v9462', 'v9464', 'v0015', 'v0016', 'v0027', 'v0026', 'v0032'
]

## 1. xdf_num_pvt

In [5]:
l_numeric_col = l_high_lighted
xdf_merged_pq_num = df_merged_pq[df_merged_pq["item_id"].isin(l_numeric_col)]
xdf_merged_pq_num["value"] = xdf_merged_pq_num[["item_id", "results"]].apply(lambda x: process_line(x), axis=1)
xdf_num_pvt = xpd.pivot_table(xdf_merged_pq_num[["exam_id", "item_id", "value"]], 
                        index="exam_id", columns="item_id", values="value", aggfunc=lambda x: x.astype(float).mean()
)

## 2. x_text_dfs

In [6]:
def detect_descriptions(x, descriptions):
    if xpd.isna(x):
        return x

    for description in descriptions.split("|"):
        if re.search(description, x):
            return 1

    return 0


def get_df_scan(df_merged, scan_items, dict_descriptions):
    df_merged_scan = df_merged[df_merged["combine_item_name"].isin(scan_items)][["exam_id", "conclusion"]]
    for k in dict_descriptions:
        v = dict_descriptions[k]
        df_merged_scan[v] = df_merged_scan["conclusion"].apply(lambda x: detect_descriptions(x, k))

    return xpd.melt(df_merged_scan.drop(["conclusion"], axis=1), id_vars=["exam_id"])

l_params = [
    {
        "scan_items" : ["常规心电图检查"],
        "dict_descriptions" : {
            "正常心电图": "HEART.Normal",
            "窦性心律;": "HEART.SinusRhythm",
            "T波改变": "HEART.T_change",
            "窦性心动过缓": "HEART.SinusBradycardia",
            "ST段" : "HEART.ST_change",
            "窦性心律不齐": "HEART.SinusArrhythmia",
            "电轴": "HEART.ElectricAxisDeviation",
            "左室高电压": "HEART.LVH",
            "窦性心动过速": "HEART.SinusTachycardia",
            "房颤|房性" : "HEART.AtrialFibrillation",
            "传导阻滞": "HEART.HeartBlocks",
            "早搏" : "HEART.ExtraSystole",
        }
    },
    {
        "scan_items" : ["肝胆脾胰（彩超）", "肝胆脾胰+双肾（彩超）", "胸部CT平扫", "肺部HR \x01T平扫"],
        "dict_descriptions" : {
            "脂肪肝": "LIVER.Steatosis",
        }
    },
    {
        "scan_items" : ["胸部CT平扫", "肺部CT平扫","肺部HR \x01T平扫", 
                        "数字化摄影(DR)", "+胸部CT平扫", "CT胸部平扫", "CT肺部HR平扫"],
        "dict_descriptions" : {
            "磨玻璃": "LUNG.GroundGlassOpacity",
            "积液": "LUNG.Effusion",
            "纤维": "LUNG.Fibrosis",
            "钙化": "LUNG.Calcification",
            "增殖灶": "LUNG.ProliferativeLesion",
            "实性结节": "LUNG.SolidNodule",
            "小结节": "LUNG.SmallNodule",
            "结节": "LUNG.Nodule",
            "未见明显异常": "LUNG.Normal"
        }
    }
]

x_text_dfs = xpd.concat([
            get_df_scan(df_merged_pq, param["scan_items"], param["dict_descriptions"]) for param in l_params 
]).pivot_table(index="exam_id", columns="variable", values="value", aggfunc=np.max)

## 3. xdf_main

In [7]:
def parse_main_pq(file_main_pq):
    xdf_main = xpd.read_parquet(file_main_pq)
    xdf_main["year"] = xdf_main["exam_date"].apply(lambda x: x.split("-")[0]).astype(np.int32)
    xdf_main["month"] = xdf_main["exam_date"].apply(lambda x: x.split("-")[1]).astype(np.int32)
    xdf_main["day"] = xdf_main["exam_date"].apply(lambda x: x.split("-")[2]).astype(np.int32)
    xdf_main["gender"] = xdf_main["gender"].apply(lambda x: "male" if x ==1 else "female" )
    xdf_main["age"] = xdf_main.apply(
                                    lambda x: int(x["year"])-int(x["birthday"].split("-")[0]), axis=1
    )
    return xdf_main


xdf_main= parse_main_pq(f"{dir_path}/analysis/main.parquet")

In [8]:
xdf_table1plus_pvt = xpd.melt(
        xdf_main.join(xdf_num_pvt.join(x_text_dfs, how="outer")).drop(["exam_date"], axis=1).rebalance(), 
        id_vars=["birthday", "year", "month", "day", "gender", "sample_id"]
    ).pivot_table(index=["birthday", "year", "month", "day", "gender", "sample_id"], 
                columns="variable", values="value", aggfunc=np.mean
)

df_table1plus_final_allmonths = xdf_table1plus_pvt.reset_index()[
        ["birthday", "year", "month", "day", "gender", "sample_id"] + l_high_lighted
    ].to_pandas()

df_table1plus_final = parse_man_info(
    df_table1plus_final_allmonths
)

l_text_columns = x_text_dfs.columns
df_table1plus_final.loc[:, l_text_columns] = df_table1plus_final[l_text_columns].applymap(
                                                    lambda x: 1 if x> 0 else 0
)
df_table1plus_3p_rev_month, l_consecute_man2p, l_consecute_man3p = get_3periods(
                                        df_table1plus_final, l_high_lighted, l_text_columns
)

  0%|          |   0.00/100 [00:00<?, ?it/s]

  0%|          |   0.00/100 [00:00<?, ?it/s]

  0%|          |   0.00/100 [00:00<?, ?it/s]

  0%|          |   0.00/100 [00:00<?, ?it/s]

In [9]:
file_meta = f"{dir_path}/analysis/feature_groups_en_v3.csv"
df_meta_group = pd.read_csv(file_meta)
df_meta_group.index = df_meta_group["item_id"]
rename_dict = df_meta_group["item_name_en"].to_dict()

output_dir = f"{dir_path}/analysis/"
dict_obj = {
    "rename_dict" : rename_dict,
    "l_consecute_man2p" : l_consecute_man2p,
    "l_consecute_man3p" : l_consecute_man3p
}
with open(f"{output_dir}/man_info.pickle", "wb") as f_out:
    pickle.dump(dict_obj, f_out)

df_table1plus_final_allmonths.to_parquet(f"{output_dir}/tableOnePlusData-final_allmonths.parquet")
df_table1plus_final.to_parquet(f"{output_dir}/tableOnePlusData-final.parquet")
df_table1plus_3p_rev_month.to_parquet(f"{output_dir}/tableOnePlusData-final_3p.parquet")
df_meta_group.to_parquet(f"{output_dir}/feature_groups_en_v3.parquet")