<a href="https://colab.research.google.com/github/ImagingDataCommons/Cloud-Resources-Workflows/blob/main/Notebooks/Totalsegmentator/dicomSEGMaps/slicerMappingsTotalSegmentator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

###**Importing Packages**

In [None]:
import pandas as pd
import json
import yaml
import os
import sys
from pathlib import Path
import time
import ast
from natsort import natsorted

###**Runtime Environment**

In [None]:
curr_dir   = Path().absolute()
os.environ['TZ'] = 'US/Eastern'
time.tzset()
current_time = time.strftime('%a %b %d %H:%M:%S %Y', time.localtime())
print(current_time)
print("\nCurrent directory :{}".format( curr_dir))
print("Python version    :", sys.version.split('\n')[0])

###**Download TotalSegmentator mappings from Slicer Documentation**

In [None]:
try:
  os.remove(f'{curr_dir}/TotalSegmentator.py')
except OSError:
  pass
!wget https://raw.githubusercontent.com/lassoan/SlicerTotalSegmentator/main/TotalSegmentator/TotalSegmentator.py

###**Data Wrangling**

In [None]:
try:
  os.remove(f'{curr_dir}/map_to_binary.py')
except OSError:
  pass
!wget https://raw.githubusercontent.com/wasserth/TotalSegmentator/master/totalsegmentator/map_to_binary.py
import map_to_binary
label_id_body_part_data = map_to_binary.class_map['total']
label_id_body_part_data_df = pd.DataFrame(list(label_id_body_part_data.items()), columns=['labelID', 'Structure'])
label_id_body_part_data_df

In [None]:
with open('TotalSegmentator.py', 'r') as f:
    content = f.read()
    tree = ast.parse(content)

class TotalSegmentatorLogicFinder(ast.NodeVisitor):
    def __init__(self):
        self.result = None

    def visit_ClassDef(self, node):
        if node.name == 'TotalSegmentatorLogic':
            for body_node in node.body:
                if isinstance(body_node, ast.FunctionDef) and body_node.name == '__init__':
                    for expr in body_node.body:
                        if isinstance(expr, ast.Assign):
                            target = expr.targets[0]
                            if isinstance(target, ast.Attribute) and target.attr == 'totalSegmentatorLabelTerminology':
                                self.result = ast.literal_eval(ast.unparse(expr.value))
        self.generic_visit(node)

finder = TotalSegmentatorLogicFinder()
finder.visit(tree)
totalSegmentatorLabelTerminology = finder.result
data = []
for key, value in totalSegmentatorLabelTerminology.items():
    fields = value.split('~')
    row = {'Structure': key}
    for i, field in enumerate(fields):
        subfields = field.split('^')
        if i == 0:
            if len(subfields) > 1:
                row['SegmentAlgorithmName'] = subfields[0]
                row['SegmentAlgorithmType'] = subfields[1]
        elif i == 1:
            if len(subfields) > 2:
                row['SegmentedPropertyCategoryCodeSequence.CodingSchemeDesignator'] = subfields[0]
                row['SegmentedPropertyCategoryCodeSequence.CodeValue'] = subfields[1]
                row['SegmentedPropertyCategoryCodeSequence.CodeMeaning'] = subfields[2]
        elif i == 2:
            if len(subfields) > 2:
                row['SegmentedPropertyTypeCodeSequence.CodingSchemeDesignator'] = subfields[0]
                row['SegmentedPropertyTypeCodeSequence.CodeValue'] = subfields[1]
                row['SegmentedPropertyTypeCodeSequence.CodeMeaning'] = subfields[2]
        elif i == 3:
            if len(subfields) > 2:
                row['SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator'] = subfields[0]
                row['SegmentedPropertyTypeModifierCodeSequence.CodeValue'] = subfields[1]
                row['SegmentedPropertyTypeModifierCodeSequence.CodeMeaning'] = subfields[2]
    data.append(row)

df = pd.DataFrame(data)

from natsort import natsort_keygen

df = df.sort_values('Structure', key=natsort_keygen()).reset_index(drop=True)

slicer_merged_df = pd.merge(label_id_body_part_data_df,df, left_on='Structure', right_on='Structure')
slicer_merged_df

###**MhubAI's dcmqi json config as a template**

In [None]:
try:
  os.remove(f'{curr_dir}/dicomseg_metadata_whole.py')
except OSError:
  pass
!wget https://raw.githubusercontent.com/MHubAI/models/main/models/totalsegmentator/config/dicomseg_metadata_whole.json

with open('/content/dicomseg_metadata_whole.json', 'r') as file:
    json_data = json.load(file)

mhubai = pd.json_normalize(json_data['segmentAttributes'][0]).reset_index(drop=True)
mhubai=mhubai[['SegmentAlgorithmName', 'SegmentAlgorithmType', 'SegmentDescription',
       'labelID', 'recommendedDisplayRGBValue',]]
mhubai

###**Generating Updated dcmqi json config using Slicer Mappings**

In [None]:
merged_df = pd.merge(mhubai,slicer_merged_df, left_on='labelID', right_on='labelID')
merged_df = merged_df.drop('Structure', axis=1)
merged_df['SegmentAlgorithmName']='TotalSegmentator v1.5.5'
merged_df=merged_df.sort_values('labelID')
merged_df

In [None]:
# merged_df_json = json.loads(merged_df.to_json(orient='records'))

# for segment in merged_df_json:
#     segment['SegmentedPropertyCategoryCodeSequence'] = {
#         'CodingSchemeDesignator': segment.pop('SegmentedPropertyCategoryCodeSequence.CodingSchemeDesignator'),
#         'CodeValue': segment.pop('SegmentedPropertyCategoryCodeSequence.CodeValue'),
#         'CodeMeaning': segment.pop('SegmentedPropertyCategoryCodeSequence.CodeMeaning')
#     }
#     segment['SegmentedPropertyTypeCodeSequence'] = {
#         'CodingSchemeDesignator': segment.pop('SegmentedPropertyTypeCodeSequence.CodingSchemeDesignator'),
#         'CodeValue': segment.pop('SegmentedPropertyTypeCodeSequence.CodeValue'),
#         'CodeMeaning': segment.pop('SegmentedPropertyTypeCodeSequence.CodeMeaning')
#     }
#     segment['SegmentedPropertyTypeModifierCodeSequence'] = {
#         'CodingSchemeDesignator': segment.pop('SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator'),
#         'CodeValue': segment.pop('SegmentedPropertyTypeModifierCodeSequence.CodeValue'),
#         'CodeMeaning': segment.pop('SegmentedPropertyTypeModifierCodeSequence.CodeMeaning')
#     }

merged_df_json = json.loads(merged_df.to_json(orient='records'))

for segment in merged_df_json:
    segment['SegmentedPropertyCategoryCodeSequence'] = {
        'CodingSchemeDesignator': segment.pop('SegmentedPropertyCategoryCodeSequence.CodingSchemeDesignator'),
        'CodeValue': segment.pop('SegmentedPropertyCategoryCodeSequence.CodeValue'),
        'CodeMeaning': segment.pop('SegmentedPropertyCategoryCodeSequence.CodeMeaning')
    }
    segment['SegmentedPropertyTypeCodeSequence'] = {
        'CodingSchemeDesignator': segment.pop('SegmentedPropertyTypeCodeSequence.CodingSchemeDesignator'),
        'CodeValue': segment.pop('SegmentedPropertyTypeCodeSequence.CodeValue'),
        'CodeMeaning': segment.pop('SegmentedPropertyTypeCodeSequence.CodeMeaning')
    }
    if ('SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator' in segment and
        (segment['SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator'].strip() or
         segment['SegmentedPropertyTypeModifierCodeSequence.CodeValue'].strip() or
         segment['SegmentedPropertyTypeModifierCodeSequence.CodeMeaning'].strip())):
        segment['SegmentedPropertyTypeModifierCodeSequence'] = {
            'CodingSchemeDesignator': segment.pop('SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator'),
            'CodeValue': segment.pop('SegmentedPropertyTypeModifierCodeSequence.CodeValue'),
            'CodeMeaning': segment.pop('SegmentedPropertyTypeModifierCodeSequence.CodeMeaning')
        }
    else:
        for key in ['SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator',
                    'SegmentedPropertyTypeModifierCodeSequence.CodeValue',
                    'SegmentedPropertyTypeModifierCodeSequence.CodeMeaning']:
            if key in segment:
                del segment[key]
final_json = {
    "BodyPartExamined": "CHEST",
    "ClinicalTrialCoordinatingCenterName": "dcmqi",
    "ClinicalTrialSeriesID": "0",
    "ClinicalTrialTimePointID": "1",
    "ContentCreatorName": "IDC",
    "ContentDescription": "Image segmentation",
    "ContentLabel": "SEGMENTATION",
    "InstanceNumber": "1",
    "SeriesDescription": "TotalSegmentator Segmentation",
    "SeriesNumber": "42",
    "segmentAttributes": [merged_df_json]
}
with open('dicomseg_metadata_whole_slicerAsRef.json', 'w') as file:
    json.dump(final_json, file, indent=4)
final_json    

In [None]:
import yaml
with open('dicomseg_metadata_whole_slicerAsRef.yaml', 'w') as file:
    yaml.dump(final_json, file)


###**Verifying Slicer Mappings against Snomed CT mappings**

In [None]:
import requests
import pandas as pd

base_url = 'https://browser.ihtsdotools.org/snowstorm/snomed-ct/MAIN'
headers = { 'User-Agent': 'Python'}

# Replace this with the actual language reference set id for your desired language
language_refset = '900000000000509007'

# Replace these with the actual column names from your dataframe
sct_codes = ['SegmentedPropertyCategoryCodeSequence.CodeValue', 'SegmentedPropertyTypeCodeSequence.CodeValue', 'SegmentedPropertyTypeModifierCodeSequence.CodeValue']

for sct_code_column in sct_codes:
    # Create new columns for preferred and acceptable terms
    preferred_terms_column = sct_code_column + '.PreferredTerms'
    acceptable_terms_column = sct_code_column + '.AcceptableTerms'
    df[preferred_terms_column] = None
    df[acceptable_terms_column] = None
    
    # Get unique SCT codes in column
    unique_sct_codes = df[sct_code_column].unique()
    
    # Get preferred and acceptable terms for each unique SCT code
    sct_code_terms = {}
    for sct_code in unique_sct_codes:
        if pd.isnull(sct_code):
            # If SCT code is missing, set preferred and acceptable terms to empty arrays
            sct_code_terms[sct_code] = ([], [])
        else:
            # Get all descriptions for the given SCT code
            response = requests.get(f'{base_url}/descriptions?conceptId={sct_code}&limit=50', headers=headers)
            descriptions = response.json()['items']
            
            # Filter descriptions by acceptability and activity
            preferred_terms = [desc['term'] for desc in descriptions if desc['active'] and desc['acceptabilityMap'].get(language_refset) == 'PREFERRED']
            acceptable_terms = [desc['term'] for desc in descriptions if desc['active'] and desc['acceptabilityMap'].get(language_refset) == 'ACCEPTABLE']
            
            # Store terms for SCT code
            sct_code_terms[sct_code] = (preferred_terms, acceptable_terms)
    
    # Add arrays to dataframe
    for index, row in df.iterrows():
        sct_code = row[sct_code_column]
        preferred_terms, acceptable_terms = sct_code_terms[sct_code]
        df.at[index, preferred_terms_column] = preferred_terms
        df.at[index, acceptable_terms_column] = acceptable_terms

sct_code_column = 'SegmentedPropertyTypeModifierCodeSequence.CodeValue'
preferred_terms_column = sct_code_column + '.PreferredTerms'
acceptable_terms_column = sct_code_column + '.AcceptableTerms'

# Remove or replace unexpected values in the preferred terms column
for index, row in df.iterrows():
    sct_code = row[sct_code_column]
    if sct_code == '':
        # If SCT code is an empty string, set preferred and acceptable terms to empty arrays
        df.at[index, preferred_terms_column] = []
        df.at[index, acceptable_terms_column] = []

column_order = ['Structure',
                'SegmentedPropertyCategoryCodeSequence.CodingSchemeDesignator',
                'SegmentedPropertyCategoryCodeSequence.CodeValue',
                'SegmentedPropertyCategoryCodeSequence.CodeMeaning',
                'SegmentedPropertyCategoryCodeSequence.CodeValue.PreferredTerms',
                'SegmentedPropertyCategoryCodeSequence.CodeValue.AcceptableTerms',
                'SegmentedPropertyTypeCodeSequence.CodingSchemeDesignator',
                'SegmentedPropertyTypeCodeSequence.CodeValue',
                'SegmentedPropertyTypeCodeSequence.CodeMeaning',
                'SegmentedPropertyTypeCodeSequence.CodeValue.PreferredTerms',
                'SegmentedPropertyTypeCodeSequence.CodeValue.AcceptableTerms',
                'SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator',
                'SegmentedPropertyTypeModifierCodeSequence.CodeValue',
                'SegmentedPropertyTypeModifierCodeSequence.CodeMeaning',
                'SegmentedPropertyTypeModifierCodeSequence.CodeValue.PreferredTerms',
                'SegmentedPropertyTypeModifierCodeSequence.CodeValue.AcceptableTerms']

new_column_names = {'SegmentedPropertyCategoryCodeSequence.CodeValue.PreferredTerms': 'SegmentedPropertyCategoryCodeSequence.CodeValue.SNOMEDCTPreferredTerms',
                    'SegmentedPropertyCategoryCodeSequence.CodeValue.AcceptableTerms': 'SegmentedPropertyCategoryCodeSequence.CodeValue.SNOMEDCTAcceptableTerms',
                    'SegmentedPropertyTypeCodeSequence.CodeValue.PreferredTerms': 'SegmentedPropertyTypeCodeSequence.CodeValue.SNOMEDCTPreferredTerms',
                    'SegmentedPropertyTypeCodeSequence.CodeValue.AcceptableTerms': 'SegmentedPropertyTypeCodeSequence.CodeValue.SNOMEDCTAcceptableTerms',
                    'SegmentedPropertyTypeModifierCodeSequence.CodeValue.PreferredTerms': 'SegmentedPropertyTypeModifierCodeSequence.CodeValue.SNOMEDCTPreferredTerms',
                    'SegmentedPropertyTypeModifierCodeSequence.CodeValue.AcceptableTerms': 'SegmentedPropertyTypeModifierCodeSequence.CodeValue.SNOMEDCTAcceptableTerms'}

# Rename the columns of the DataFrame
df = df.rename(columns=new_column_names)
df

In [None]:
df.to_csv('slicerMappingsToSnomedMappings.csv')

In [None]:
df=df[['Structure',
       'SegmentedPropertyCategoryCodeSequence.CodingSchemeDesignator',
       'SegmentedPropertyCategoryCodeSequence.CodeValue',
       'SegmentedPropertyCategoryCodeSequence.CodeMeaning',
       'SegmentedPropertyTypeCodeSequence.CodingSchemeDesignator',
       'SegmentedPropertyTypeCodeSequence.CodeValue',
       'SegmentedPropertyTypeCodeSequence.CodeMeaning',
       'SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator',
       'SegmentedPropertyTypeModifierCodeSequence.CodeValue',
       'SegmentedPropertyTypeModifierCodeSequence.CodeMeaning']]
df      

In [None]:
df.to_csv('TotalSegmentator v1 SCT mapping - total.csv')

###**Trying to use Slicer's json config (only has 87 parts)**

In [None]:
try:
  os.remove(f'{curr_dir}/SegmentationCategoryTypeModifier-TotalSegmentator.term.json')
except OSError:
  pass
!wget https://raw.githubusercontent.com/lassoan/SlicerTotalSegmentator/main/TotalSegmentator/Resources/SegmentationCategoryTypeModifier-TotalSegmentator.term.json




In [None]:
import json
import pandas as pd

with open('SegmentationCategoryTypeModifier-TotalSegmentator.term.json', 'r') as file:
    json_data = json.load(file)

rows = []
for category in json_data['SegmentationCodes']['Category']:
    for item in category['Type']:
        row = {}
        row['CodeValue'] = item['CodeValue']
        row['CodeMeaning'] = item['CodeMeaning']
        row['CodingSchemeDesignator'] = item['CodingSchemeDesignator']
        if 'Modifier' in item:
            for modifier in item['Modifier']:
                new_row = row.copy()
                new_row['ModifierCodeValue'] = modifier['CodeValue']
                new_row['ModifierCodeMeaning'] = modifier['CodeMeaning']
                new_row['ModifierCodingSchemeDesignator'] = modifier['CodingSchemeDesignator']
                if '3dSlicerLabel' in modifier:
                    new_row['3dSlicerLabel'] = modifier['3dSlicerLabel']
                if 'recommendedDisplayRGBValue' in modifier:
                    new_row['recommendedDisplayRGBValue'] = modifier['recommendedDisplayRGBValue']
                rows.append(new_row)
        else:
            rows.append(row)

df = pd.DataFrame(rows)

# Rename the columns to match the desired column names
df = df.rename(columns={
    'CodingSchemeDesignator': 'SegmentedPropertyTypeCodeSequence.CodingSchemeDesignator',
    'CodeValue': 'SegmentedPropertyTypeCodeSequence.CodeValue',
    'CodeMeaning': 'SegmentedPropertyTypeCodeSequence.CodeMeaning',
    'ModifierCodingSchemeDesignator': 'SegmentedPropertyTypeModifierCodeSequence.CodingSchemeDesignator',
    'ModifierCodeValue': 'SegmentedPropertyTypeModifierCodeSequence.CodeValue',
    'ModifierCodeMeaning': 'SegmentedPropertyTypeModifierCodeSequence.CodeMeaning'
})
filtered_df = df[pd.notna(df['3dSlicerLabel'])]
filtered_df