## Extract the scanner info from the DICOM images,  used for ComBat harmonization.


In [None]:
import os
import pandas as pd
from DcmData import DcmData
from collections import Counter

import sys
sys.path.append("../")
from utils.myUtils import traversalDir_FirstDir, get_filenames
from mySettings import get_scanner_info_extraction_setting_dict

In [None]:
"""
Define scanner info extraction for TCGA data.
"""
def main_arrange_scanner_info_TCGA(base_path, save_excel_path, lgg_gbm_class):
    print("Arrange the scanner info for the MRI images in the folder: \n {}.".format(base_path))
    
    # First subfolder: patient name list
    patient_name_list=traversalDir_FirstDir(base_path)
    print("\n There are totally {} patients.".format(len(patient_name_list)))

    Scanner_info=[]
    for patient_name in patient_name_list:
        patient_folder=os.path.join(base_path, patient_name)
        temp_subdir_list=traversalDir_FirstDir(patient_folder)
        
        # folder list: corresponding to different MRI series.
        patient_series_folder_list=[]
        patient_series_path_list=[]
        for temp_subfolder in temp_subdir_list:
            patient_series_folders=traversalDir_FirstDir(os.path.join(patient_folder, temp_subfolder))
            patient_series_folder_list+=patient_series_folders
            patient_series_path_list+=[os.path.join(patient_folder, temp_subfolder, patient_series_folder) for patient_series_folder in patient_series_folders]

        if len(patient_series_folder_list)!=4:
            print("\n\n Notice!!! There are {} MRI image sequences for patient {}.".format(len(patient_series_folder_list), patient_name))
            print("{}: {}.".format(patient_name, patient_series_folder_list))
        
        #For each patient of each series, extract the scanner info.
        assert len(patient_series_folder_list)==len(patient_series_path_list)
        patient_scanner_info={}
        patient_modalities=[]
        for i in range(len(patient_series_folder_list)):
            patient_series_path=patient_series_path_list[i]
            example_dcm_file=get_filenames(patient_series_path)[0]
            scanner_info=DcmData(example_dcm_file).get_Infos()
            
            #name the modality with suffix "copy" to enable multiple images for certain modality.
            modality=scanner_info["SeriesDescription_normalized"]
            if modality in patient_modalities:
                if modality.split("_")[-1].startswith("copy"):
                    copy_index=modality[-1]
                    modality=modality.replace(copy_index, copy_index+1)
                else:
                    modality=modality+"_copy1"

            patient_modalities.append(modality)
            #add suffix
            scanner_info_with_suffix={}
            for key, value in scanner_info.items():
                scanner_info_with_suffix[key+"_"+modality]=value
                
            patient_scanner_info={**patient_scanner_info, **scanner_info_with_suffix}
        
        #Save the most common used magnetic field strength among these different modalities for each patient.
        MagneticFieldStrength_normalized={}
        for key, values in patient_scanner_info.items():
            if key.startswith("MagneticFieldStrength_normalized_"):
                MagneticFieldStrength_normalized[key]=values
                
        MagneticFieldStrength_valueCount=Counter(list(MagneticFieldStrength_normalized.values()))   
        MagneticFieldStrength_mostCommon=MagneticFieldStrength_valueCount.most_common(1)[0][0]
        is_3T_mostCommon=(MagneticFieldStrength_mostCommon==3)+0
        
        #the final scanner info for the patient
        patient_info={"patient_id":patient_name,
                      "lgg_gbm_class": lgg_gbm_class,
                      "MagneticFieldStrength_valueCount": MagneticFieldStrength_valueCount,
                      "MagneticFieldStrength_mostCommon": MagneticFieldStrength_mostCommon,
                      "is_3T_mostCommon": is_3T_mostCommon}
        Scanner_info.append({**patient_info, **patient_scanner_info})
            
   
    # save the scanner info into an excel.
    Scanner_info=pd.DataFrame(Scanner_info)
    Scanner_info.set_index(["patient_id"], inplace=True)
    Scanner_info.to_excel(save_excel_path)
    print("\n Finish arrange the scanner info: \n {}.".format(Scanner_info))
    
    return Scanner_info


"""
Define scanner info extraction for BraTS2021 data.
"""
def main_arrange_scanner_info_BraTS2021(base_path, save_excel_path, modality_dict):
    print("Arrange the scanner info for the MRI images in the folder: \n {}.".format(base_path))
    
    # First subfolder: patient name list
    patient_name_list=traversalDir_FirstDir(base_path)
    print("\n There are totally {} patients.".format(len(patient_name_list)))

    Scanner_info=[]
    for patient_name in patient_name_list:
        patient_folder=os.path.join(base_path, patient_name)
        patient_series_folders=traversalDir_FirstDir(patient_folder)
        
        assert patient_series_folders.sort()==list(modality_dict.keys()).sort()
        
        for patient_series in patient_series_folders: 
            patient_info={"patient_id":"BraTS2021_"+ patient_name,
                         "modality": patient_series}
            patient_series_path=os.path.join(patient_folder, patient_series)
            example_dcm_file=get_filenames(patient_series_path)[0]
            scanner_info=DcmData(example_dcm_file).get_Infos()
            
            Scanner_info.append({**patient_info, **scanner_info})
      
    # convert into a dataframe.
    Scanner_info=pd.DataFrame(Scanner_info)
    
    # save for each modality
    Scanner_info_modality_list=[]
    for modality in patient_series_folders:
        Scanner_info_modality=Scanner_info[Scanner_info["modality"]==modality]
        Scanner_info_modality.set_index(["patient_id"], inplace=True)
        Scanner_info_modality = Scanner_info_modality.add_suffix('_'+modality_dict[modality])
        Scanner_info_modality_list.append(Scanner_info_modality)
    
    Scanner_info_all_modalities=pd.concat(Scanner_info_modality_list, axis=1, join='outer')
    
    #save the scanner info into an excel
    Scanner_info_all_modalities.to_excel(save_excel_path)
    print("\n Finish arrange the scanner info: \n {}.".format(Scanner_info_all_modalities))
    
    return Scanner_info_all_modalities

### Main

In [None]:
scanner_info_extraction_setting_dict=get_scanner_info_extraction_setting_dict()

for name, scanner_info_extraction_setting in scanner_info_extraction_setting_dict.items():
    print("\n\n=== Extract scanner setting info for {}. ===".format(name))
    
    dcm_image_folder=scanner_info_extraction_setting["dcm_image_folder"]
    base_results_path=scanner_info_extraction_setting["base_results_path"]
    save_scanner_info_excel_path=os.path.join(base_results_path, "Scanner_info_"+ name +".xlsx")
    #check whether the save folder path exists.
    if not os.path.exists(base_results_path):
        os.makedirs(base_results_path)
     
    if scanner_info_extraction_setting["description"]=="TCGA":
        lgg_gbm_class=scanner_info_extraction_setting["lgg_gbm_class"]
        main_arrange_scanner_info_TCGA(dcm_image_folder, save_scanner_info_excel_path, lgg_gbm_class)
    else:
        modality_dict=scanner_info_extraction_setting["modality_dict"]
        main_arrange_scanner_info_BraTS2021(dcm_image_folder, save_scanner_info_excel_path, modality_dict)
        
    print("\n Finish extracting scanner info for {}.".format(name))