In [0]:
# CÀI THƯ VIỆN (nếu chưa có)
# Lưu ý: trong Serverless, %pip cài cho cả driver & worker
%pip install pydicom==3.0.1
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# CẤU HÌNH & HÀM CHUNG (SERVERLESS-SAFE)
import re
from typing import Iterator, List, Dict, Any
import pandas as pd
import pydicom
import logging

from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType
)

# Volume gốc (FUSE path – dùng cho pydicom)
VOLUME_ROOT = "/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/"

CATALOG = "dbms"
SCHEMA  = "bronze"
TABLE   = "mri_dicom_new"

# Regex cho /MRI-data/<id>/<study>/<series>/<file>
_TRIPLET = re.compile(r"/MRI-data/([^/]+)/([^/]+)/([^/]+)/[^/]+$")

def parse_triplet(path: str):
    m = _TRIPLET.search(path)
    if not m: return None, None, None
    return m.group(1), m.group(2), m.group(3)

def list_files_recursive(prefix: str) -> List[str]:
    """
    Duyệt đệ quy bằng dbutils.fs.ls.
    Trả về đường dẫn FUSE (/Volumes/...) để pydicom đọc trực tiếp.
    """
    stack = [prefix if prefix.endswith("/") else prefix + "/"]
    out: List[str] = []
    while stack:
        cur = stack.pop()
        for info in dbutils.fs.ls(cur):
            p_dbfs = info.path                   # dbfs:/Volumes/...
            p_fuse = p_dbfs.replace("dbfs:", "") # /Volumes/...
            if info.isDir():
                stack.append(p_dbfs)
            else:
                out.append(p_fuse)
    return out

def get_and_transform(dcm, tag: str):
    v = getattr(dcm, tag)
    if isinstance(v, (list, tuple)):
        return ",".join(map(str, v))
    return str(v)

# Schema tường minh cho kết quả mapInPandas (snake_case)
schema = StructType([
    StructField("id",                        StringType(), True),
    StructField("study",                     StringType(), True),
    StructField("series",                    StringType(), True),
    StructField("path",                      StringType(), True),

    StructField("instance_creation_time",    StringType(), True),
    StructField("sop_class_uid",             StringType(), True),
    StructField("sop_instance_uid",          StringType(), True),
    StructField("study_time",                StringType(), True),
    StructField("series_time",               StringType(), True),
    StructField("acquisition_time",          StringType(), True),
    StructField("modality",                  StringType(), True),
    StructField("study_description",         StringType(), True),
    StructField("series_description",        StringType(), True),
    StructField("patient_sex",               StringType(), True),
    StructField("patient_age",               StringType(), True),
    StructField("patient_size",              StringType(), True),
    StructField("patient_weight",            StringType(), True),
    StructField("body_part_examined",        StringType(), True),
    StructField("patient_position",          StringType(), True),
    StructField("study_instance_uid",        StringType(), True),
    StructField("series_instance_uid",       StringType(), True),
    StructField("image_orientation_patient", StringType(), True),
])


In [0]:
# COMMAND ----------
# LẤY DANH SÁCH FILE & TẠO DATAFRAME ĐƯỜNG DẪN
paths = list_files_recursive(VOLUME_ROOT)
n = len(paths)
print(f"Discovered files: {n}")

# Tạo DataFrame 'path' (không dùng sparkContext)
df_paths = spark.createDataFrame([(p,) for p in paths], ["path"])

# Chia partition để song song hoá (điều chỉnh tuỳ cluster)
TARGET_FILES_PER_PART = 400
num_partitions = max(128, n // TARGET_FILES_PER_PART or 1)
df_paths = df_paths.repartition(num_partitions)

display(df_paths.limit(10))


Discovered files: 48345


path
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0507/L-SPINE_LSS_20160207_151814_188000/T1_TSE_TRA_0006/T1_TSE_TRA__0507_013.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0509/L-SPINE_CLINICAL_LIBRARIES_20160626_122428_875000/T1_TSE_SAG_320_0003/T1_TSE_SAG__0509_004.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0512/L-SPINE_LSS_20160109_135229_694000/LOCALIZER_0001/LOCALIZER_0_0512_007.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0514/L-SPINE_LSS_20151202_135421_609000/T2_TSE_TRA_384_0004/T2_TSE_TRA__0514_001.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0515/L-SPINE_LSS_20151031_123547_778000/PD_TSE_SAG_320_0015/PD_TSE_SAG__0515_013.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0515/L-SPINE_LSS_20151031_123547_778000/T2_TSE_TRA_384_0004/T2_TSE_TRA__0515_003.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0517/L-SPINE_LSS_20160307_103636_358000/T1_TSE_SAG_320_0003/T1_TSE_SAG__0517_005.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0518/L-SPINE_LSS_20160229_144047_477000/T2_TSE_TRA_384_0004/T2_TSE_TRA__0518_016.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0519/L-SPINE_LSS_20151216_113700_531000/T2_TSE_TRA_384_0004/T2_TSE_TRA__0519_014.ima
/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0521/L-SPINE_LSS_20160224_145240_903000/T1_TSE_TRA_0005/T1_TSE_TRA__0521_014.ima


In [0]:
# COMMAND ----------
# SONG SONG ĐỌC HEADER BẰNG mapInPandas (SERVERLESS-SAFE)
from pyspark.sql import DataFrame

def extract_headers(batch_iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    cols = [f.name for f in schema.fields]
    for pdf in batch_iter:
        out: List[Dict[str, Any]] = []
        for path in pdf["path"].tolist():
            id_, study, series = parse_triplet(path)
            if not id_:
                continue
            try:
                d = pydicom.dcmread(path, stop_before_pixels=True, force=True)
                out.append({
                    "id": id_, 
                    "study": study, 
                    "series": series, 
                    "path": path,

                    "instance_creation_time":    get_and_transform(d, "InstanceCreationTime"),
                    "sop_class_uid":             get_and_transform(d, "SOPClassUID"),
                    "sop_instance_uid":          get_and_transform(d, "SOPInstanceUID"),
                    "study_time":                get_and_transform(d, "StudyTime"),
                    "series_time":               get_and_transform(d, "SeriesTime"),
                    "acquisition_time":          get_and_transform(d, "AcquisitionTime"),
                    "modality":                  get_and_transform(d, "Modality"),
                    "study_description":         get_and_transform(d, "StudyDescription"),
                    "series_description":        get_and_transform(d, "SeriesDescription"),
                    "patient_sex":               get_and_transform(d, "PatientSex"),
                    "patient_age":               get_and_transform(d, "PatientAge"),
                    "patient_size":              get_and_transform(d, "PatientSize"),
                    "patient_weight":            get_and_transform(d, "PatientWeight"),
                    "body_part_examined":        get_and_transform(d, "BodyPartExamined"),
                    "patient_position":          get_and_transform(d, "PatientPosition"),
                    "study_instance_uid":        get_and_transform(d, "StudyInstanceUID"),
                    "series_instance_uid":       get_and_transform(d, "SeriesInstanceUID"),
                    "image_orientation_patient": get_and_transform(d, "ImageOrientationPatient"),
                })
            except Exception as e:
                print(f"Error processing path {path}: {e}")
        yield pd.DataFrame(out, columns=cols)

df_hdrs: DataFrame = df_paths.mapInPandas(extract_headers, schema=schema)

display(df_hdrs.limit(10))
print("Header count:", df_hdrs.count())


id,study,series,path,instance_creation_time,sop_class_uid,sop_instance_uid,study_time,series_time,acquisition_time,modality,study_description,series_description,patient_sex,patient_age,patient_size,patient_weight,body_part_examined,patient_position,study_instance_uid,series_instance_uid,image_orientation_patient
507,L-SPINE_LSS_20160207_151814_188000,T1_TSE_TRA_0006,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0507/L-SPINE_LSS_20160207_151814_188000/T1_TSE_TRA_0006/T1_TSE_TRA__0507_013.ima,153430.834,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.39965233611769829838493556684263231087,151814.188,153430.469,153140.9775,MR,l-spine^lss,t1_tse_tra,F,052Y,1.7,80,LSPINE,FFS,1.3.12.2.1107.5.2.40.50233.30000016020712154209400000004,1.3.12.2.1107.5.2.40.50233.2016020715343041566118458.0.0.0,"[0.9995638907768, 0.0052532265065, 0.0290591098714, -0.0057571175636, 0.999834049003, 0.0172838089464]"
509,L-SPINE_CLINICAL_LIBRARIES_20160626_122428_875000,T1_TSE_SAG_320_0003,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0509/L-SPINE_CLINICAL_LIBRARIES_20160626_122428_875000/T1_TSE_SAG_320_0003/T1_TSE_SAG__0509_004.ima,123900.815,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.408538978611776564131770216051446069709,122428.875,123900.791,123613.3025,MR,l-spine^clinical libraries,t1_tse_sag_320,F,036Y,1.8,80,LSPINE,HFS,1.3.12.2.1107.5.2.40.50233.30000016062610242888200000001,1.3.12.2.1107.5.2.40.50233.2016062612390020335701186.0.0.0,"[3.937E-12, 1, -2.05066E-10, 0.019197426647, -2.05103E-10, -0.9998157124241]"
512,L-SPINE_LSS_20160109_135229_694000,LOCALIZER_0001,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0512/L-SPINE_LSS_20160109_135229_694000/LOCALIZER_0001/LOCALIZER_0_0512_007.ima,135328.872,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.41641500811877926306171657580784107564,135229.694,135301.842,135324.34,MR,l-spine^lss,localizer,M,045Y,1.6,75,LSPINE,HFS,1.3.12.2.1107.5.2.40.50233.30000016011007564967600000078,1.3.12.2.1107.5.2.40.50233.2016010913525483869420626.0.0.0,"[1, 0, 0, 0, 0, -1]"
514,L-SPINE_LSS_20151202_135421_609000,T2_TSE_TRA_384_0004,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0514/L-SPINE_LSS_20151202_135421_609000/T2_TSE_TRA_384_0004/T2_TSE_TRA__0514_001.ima,140555.855,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.374178447811248461532797580032192710316,135421.609,140555.843,140420.975,MR,l-spine^lss,t2_tse_tra_384,F,034Y,1.6,70,LSPINE,HFS,1.3.12.2.1107.5.2.40.50233.30000015120208074935500000010,1.3.12.2.1107.5.2.40.50233.2015120214055561995410876.0.0.0,"[0.9999457895028, -0.0011070479347, -0.0103533811148, -4.290075E-09, 0.9943319046506, -0.1063205690071]"
515,L-SPINE_LSS_20151031_123547_778000,PD_TSE_SAG_320_0015,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0515/L-SPINE_LSS_20151031_123547_778000/PD_TSE_SAG_320_0015/PD_TSE_SAG__0515_013.ima,131929.012,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.82861571811234581907137100071236326468,123547.778,131928.993,131616.905,MR,l-spine^lss,pd_tse_sag_320,F,028Y,1.66,80,KNEE,FFS,1.3.12.2.1107.5.2.40.50233.30000015103107390908500000011,1.3.12.2.1107.5.2.40.50233.2015103113192610015609965.0.0.0,"[0.1509790783917, 0.9885369582813, -1.683821E-08, 0.1376321567553, -0.0210205523265, -0.9902603323403]"
515,L-SPINE_LSS_20151031_123547_778000,T2_TSE_TRA_384_0004,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0515/L-SPINE_LSS_20151031_123547_778000/T2_TSE_TRA_384_0004/T2_TSE_TRA__0515_003.ima,124741.946,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.248593387011324769708720091020465083647,123547.778,124741.932,124606.895,MR,l-spine^lss,t2_tse_tra_384,F,028Y,1.66,80,LSPINE,HFS,1.3.12.2.1107.5.2.40.50233.30000015103107390908500000011,1.3.12.2.1107.5.2.40.50233.2015103112474163193208286.0.0.0,"[0.9998769475388, 0.0007927264941, -0.0156672066804, -3.089038E-08, 0.9987224824641, 0.050531208383]"
517,L-SPINE_LSS_20160307_103636_358000,T1_TSE_SAG_320_0003,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0517/L-SPINE_LSS_20160307_103636_358000/T1_TSE_SAG_320_0003/T1_TSE_SAG__0517_005.ima,104659.268,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.350868759811945901508258620281405469187,103636.358,104659.261,104411.0125,MR,l-spine^lss,t1_tse_sag_320,M,042Y,1.8,80,LSPINE,HFS,1.3.12.2.1107.5.2.40.50233.30000016030707311932800000016,1.3.12.2.1107.5.2.40.50233.2016030710465868375613426.0.0.0,"[3.937E-12, 1, -2.05066E-10, 0.019197426647, -2.05103E-10, -0.9998157124241]"
518,L-SPINE_LSS_20160229_144047_477000,T2_TSE_TRA_384_0004,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0518/L-SPINE_LSS_20160229_144047_477000/T2_TSE_TRA_384_0004/T2_TSE_TRA__0518_016.ima,145235.996,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.199359307111913549208162989080209875247,144047.477,145235.972,145101.3675,MR,l-spine^lss,t2_tse_tra_384,F,044Y,1.7,70,LSPINE,HFS,1.3.12.2.1107.5.2.40.50233.30000016030107191214100000204,1.3.12.2.1107.5.2.40.50233.30000016030107191214100000184,"[0.9997186287984, 0.0028007798681, 0.0235545932995, -0.0148557115074, 0.8480707670834, 0.5296746943682]"
519,L-SPINE_LSS_20151216_113700_531000,T2_TSE_TRA_384_0004,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0519/L-SPINE_LSS_20151216_113700_531000/T2_TSE_TRA_384_0004/T2_TSE_TRA__0519_014.ima,115114.989,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.333431419812195106707039338800406978624,113700.531,115114.979,114939.3175,MR,l-spine^lss,t2_tse_tra_384,F,038Y,1.8,80,LSPINE,HFS,1.2.840.113845.11.1000000002066606790.20151217112630.1006861,1.3.12.2.1107.5.2.40.50233.2015121611511465072303500.0.0.0,"[0.9997779632733, 0.0022500667731, 0.0209514045482, -0.0001206388653, 0.9948775138475, -0.1010876742579]"
521,L-SPINE_LSS_20160224_145240_903000,T1_TSE_TRA_0005,/Volumes/dbms/bronze/lumbar-spine-dataset/MRI-data/0521/L-SPINE_LSS_20160224_145240_903000/T1_TSE_TRA_0005/T1_TSE_TRA__0521_014.ima,150941.083,1.2.840.10008.5.1.4.1.1.4,1.3.6.1.4.1.9590.100.1.2.39373441011652373512764863041956667277,145240.903,150941.067,150740.405,MR,l-spine^lss,t1_tse_tra,F,027Y,1.8,80,LSPINE,HFS,1.3.12.2.1107.5.2.40.50233.30000016022407273500600000025,1.3.12.2.1107.5.2.40.50233.2016022415094073031735354.0.0.0,"[0.9991853616067, -0.0021085289551, -0.040300958507, 0.0183130409544, 0.913583509208, 0.4062386050515]"


Header count: 48194


In [0]:
# COMMAND ----------
# CHUẨN HOÁ KIỂU & GHI DELTA TABLE (BẢN CHỐNG-FAULTY GIÁ TRỊ)
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

# Regex cho số thực, có thể có E-notation
NUM_RE = r'[-+]?(?:\d+\.?\d*|\.\d+)(?:[Ee][-+]?\d+)?'

def norm2arr(col):
    # Bỏ ký tự bao quanh & khoảng trắng
    c = F.regexp_replace(col, r'[\[\]\(\)\s]', '')
    # Chuẩn hoá các dấu phân cách về dấu phẩy
    c = F.regexp_replace(c, r'[\\;|]', ',')   # \ ; |
    # Nén nhiều dấu phẩy liên tiếp
    c = F.regexp_replace(c, r',+', ',')
    return F.split(c, ',')

df2 = (
    df_hdrs


    # (tuỳ chọn) khử trùng lặp
    .dropDuplicates(["study_instance_uid","series_instance_uid","sop_instance_uid","path"])
)

(
    df2
    .repartition("id")      # tối ưu ghi theo partition
    .write
    .mode("overwrite")
    .format("delta")
    .partitionBy("id")
    .saveAsTable(f"{CATALOG}.{SCHEMA}.{TABLE}")
)

print(f"Saved to {CATALOG}.{SCHEMA}.{TABLE}. Rows: {df2.count()}")

Saved to dbms.bronze.mri_dicom_new. Rows: 48194
