In [1]:
import pandas as pd
import polars as pl
import gc
import json
import glob
import hashlib

In [2]:
def j2d(f):
    with open(f, "rt") as file:
        d = json.load(file)
    return d

def d2j(d, f):
    with open(f, "wt") as file:
        json.dump(d, file, indent = 2)

In [3]:
d_ss = j2d("../../share3/mat/d_ss.json")

In [4]:
d_ss

{'solt': '89sryf', 'stretch': 62}

In [5]:
l_cancer = j2d(f="../../share3/mat/cans_codes.json")['cancercode']
l_cancer[:3]


atcsub = j2d(f="../../share3/mat/atc_codes.json")['atcsubcode']
atcsub[:3]

['A10AC', 'A10AD', 'A10AB']

In [6]:
lx = [str(i) for i in range(10)]
lx

['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

In [7]:
# %%time
# f = "../../share3/tekiyo_all.parquet"
# pts_ = pl.scan_parquet(f)
# pts_.fetch()

In [8]:
%%time
# f = "../../share3/tekiyo.parquet"
f = "../../share3/tekiyo_all.parquet"
pts_ = pl.scan_parquet(f)\
    .with_columns(
    (pl.col("observable_start_ym") + "/01").str.to_datetime().alias("start"),
    (pl.col("observable_end_ym") + "/01").str.to_datetime().alias("end"),
    (pl.col("birth_ym") + "/01").alias("birth").str.to_datetime().alias("birth")
    )\
    .with_columns(
        ((pl.col("start") - pl.col("birth")).dt.total_days()/365.25).alias("age_at_start"),
        ((pl.col("end") - pl.col("start")).dt.total_days()*12/365.25).round(0).cast(pl.Int32).alias("follow"),
    )\
    .with_columns(
    pl.col("kojin_id").hash(seed=d_ss["stretch"]).cast(pl.String).str.slice(-1,1).alias("grp"),
)

for i in lx:
    f_out = "../../share3/smr/df_pt_split_{}.parquet".format(i)
    print(f_out)
    pts_.filter(pl.col("grp") == i)\
        .sink_parquet(f_out,maintain_order=True)

    
    # dis_.collect(streaming=True).write_parquet(f_out)
    # pts_.sink_parquet(f_out)
    
    gc.collect()
    # break

../../share3/smr/df_pt_split_0.parquet
../../share3/smr/df_pt_split_1.parquet
../../share3/smr/df_pt_split_2.parquet
../../share3/smr/df_pt_split_3.parquet
../../share3/smr/df_pt_split_4.parquet
../../share3/smr/df_pt_split_5.parquet
../../share3/smr/df_pt_split_6.parquet
../../share3/smr/df_pt_split_7.parquet
../../share3/smr/df_pt_split_8.parquet
../../share3/smr/df_pt_split_9.parquet
CPU times: user 5min 46s, sys: 3min 27s, total: 9min 13s
Wall time: 4min 14s


In [8]:
%%time
for i in lx:
    f = "../../share3/smr/df_pt_split_{}.parquet".format(i)
    pts_ = pl.scan_parquet(f,low_memory=True)\
        .select(
            pl.col("kojin_id", "birth_ym", "sex_code","start", "end", "birth", "age_at_start", "follow", "grp")
        )
    pts_follow = pts_\
        .with_columns(
            pl.lit("cens").alias("event_type"),
            (pl.col("end") -pl.col("start")).alias("time")
        )

    f = "../../share3/diag/df_dm_diag_split_{}.parquet".format(i)
    dm_ = pl.scan_parquet(f,low_memory=True)\
        .with_columns(
        (pl.col("receipt_ym") + "/01").str.to_datetime().alias("receipt_ym_dt")
        )\
        .select(pl.exclude(atcsub))\
        .group_by("kojin_id")\
        .agg(pl.col("receipt_ym_dt").min())
    pts_dm = dm_\
        .join(other=pts_,
              how="left",
              on = ["kojin_id"])\
        .select(pl.exclude("end"))\
        .with_columns(
            pl.lit("dm").alias("event_type"),
            (pl.col("receipt_ym_dt") -pl.col("start")).alias("time")
        )\
        .rename({"receipt_ym_dt":"end"})
        

    f = "../../share3/diag/df_cancers_diag_split_{}.parquet".format(i)
    cans_ = pl.scan_parquet(f,low_memory=True)\
        .with_columns(
            (pl.col("receipt_ym") + "/01").str.to_datetime().alias("receipt_ym_dt")
            )\
        .select(pl.exclude("grp"))

    pts_cans = cans_\
        .join(other=pts_,
              how="left",
              on = ["kojin_id"])\
        .select(pl.exclude("end"))\
        .with_columns(
            pl.col("icd10_sub_code").alias("event_type"),
            (pl.col("receipt_ym_dt") -pl.col("start")).alias("time")
        )\
        .rename({"receipt_ym_dt":"end"})\
        .select(pl.exclude(["icd10_sub_code","with_cancer_diag_codes","receipt_ym"]))

    pts_follow_dm_cans = pl.concat(
        [pts_follow, pts_dm, pts_cans],
         how="diagonal"
    )\
        .sort("kojin_id")
    f_out = "../../share3/smr/df_pt_smr_nc_{}.parquet".format(i)
    print(f_out)
    pts_follow_dm_cans.sink_parquet(f_out)
    gc.collect()
    # break

# pts_follow.fetch()
# dm_.fetch()
# pts_dm.collect()
# cans_.fetch()

# pts_dm.collect()
# pts_follow_dm_cans.collect()

../../share3/smr/df_pt_smr_nc_0.parquet
../../share3/smr/df_pt_smr_nc_1.parquet
../../share3/smr/df_pt_smr_nc_2.parquet
../../share3/smr/df_pt_smr_nc_3.parquet
../../share3/smr/df_pt_smr_nc_4.parquet
../../share3/smr/df_pt_smr_nc_5.parquet
../../share3/smr/df_pt_smr_nc_6.parquet
../../share3/smr/df_pt_smr_nc_7.parquet
../../share3/smr/df_pt_smr_nc_8.parquet
../../share3/smr/df_pt_smr_nc_9.parquet
CPU times: user 1min 16s, sys: 12.9 s, total: 1min 28s
Wall time: 18.9 s


In [15]:
# pl.scan_parquet("../../share3/smr/df_pt_smr_nc_0.parquet").fetch()