In [1]:
import json
import os
import glob
import re
from typing import Optional as Opt, Tuple, Union, List
from PIL import Image
import cv2
import io
import base64
import numpy as np
import requests
import hashlib
import pandas as pd

from datetime import datetime
from dateutil import parser
import pytz

import uuid
import warnings

import pydicom
from pydicom.dataset import FileDataset as pydicomFileDataset, FileMetaDataset as pydicomFileMetaDataset
from pydicom import Dataset as pydicomDataset, Sequence, dcmread, dcmwrite
from pydicom.dataelem import DataElement
from pydicom.datadict import dictionary_VR, dictionary_has_tag
from pydicom.uid import UID as pydicomUID, generate_uid as generate_pydicomUID


from pyxnat import Interface, schema

from pathlib import Path, PurePosixPath

import shutil
import tempfile

from src.utilities import MetaTables, USCentralDateTime, XNATLogin, XNATConnection, ImageHash
from src.xnat_experiment_data import *
from src.xnat_scan_data import *
from src.xnat_resource_data import ORDataIntakeForm


login_info, verbose = { 'Username': 'dmattioli', 'Password': 'PooPoopoopoo123$', 'Url': 'https://rpacs.iibi.uiowa.edu/xnat/' }, True
validated_login = XNATLogin( login_info, verbose=verbose )
xnat_connection = XNATConnection( validated_login, stay_connected=True, verbose=verbose )
metatables = MetaTables( validated_login, xnat_connection, verbose=True )

-- Validated XNATLogin --
	User: dmattioli
	Server: https://rpacs.iibi.uiowa.edu/xnat/

-- XNAT Connection: Open --
	Signed-in as:	dmattioli
	Project:	<Project Object> GROK_AHRQ_Data `AHRQ-GROK Intraoperative Trauma and Arthroscopy Data` (private) 24 subjects  (owner: domattioli) (created on 2024-08-12 11:12:38.965) https://rpacs.iibi.uiowa.edu/xnat//data/projects/GROK_AHRQ_Data?format=html

-- MetaTables -- Accessed by: DMATTIOLI
   *Last Modified: 2024-08-28T15:24:22.368709-05:00
	       Table Name # Items # Columns
	 REGISTERED_USERS       7         4
	ACQUISITION_SITES       3         4
	           GROUPS      28         4
	         SUBJECTS      24         6
	     IMAGE_HASHES     173         6
	         SURGEONS      43         7


### SourceRFSession

In [20]:
from src.xnat_experiment_data import ExperimentData
from src.xnat_scan_data import SourceDicomDeIdentified
class SourceRFSession(ExperimentData):
    """
    A class representing the XNAT Experiment for RadioFluoroscopic (RF) Source Images. Inherits from ExperimentData. Intended for structuring trauma cases with fluoroscopic image sequences.

    Inputs:
    - intake_form (ORDataIntakeForm): digitized json-formatted form detailing the surgical data to be uploaded to XNAT. This is also uploaded with the source data.
    - metatables (MetaTables): metatables object containing the user's validated metatables configuration.

    Attributes:
    intake_form (ORDataIntakeForm): digitized json-formatted form detailing the surgical data to be uploaded to XNAT. This is also uploaded.
    tmp_source_data_dir (Path): local tmp directory in which all source data is temporarily stored before being pushed to XNAT.
    df (pd.DataFrame): dataframe containing all relevant data for the experiment session, e.g., a fluorosequence of 15 images will have 15 rows in df.
    is_valid (bool): flag indicating whether the experiment session is valid and can be pushed to XNAT.
    schema_prefix_str (str): string prefix for the schema type of the experiment session, e.g., 'rf' for radio fluoroscopic data.
    scan_type_label (str): string label for the type of scan data, e.g., 'DICOM' for radio fluoroscopic data

    Methods (unique to this inherited class):
    write(): Writes the SourceRFSession to a zipped folder in a temporary local directory, which can then be pushed to XNAT.
    - See docstring for ExperimentData for other methods.

    Example Usage:
    SourceRFSession( intake_form=ORDataIntakeForm, metatables=MetaTables )
    """

    def __init__( self, intake_form: ORDataIntakeForm, metatables: MetaTables ) -> None:
        """
        Initializes the SourceESVSession object.
        Populate a dataframe to represent all post-op images and intraoperative videos in the inputted folder. Check the validity of the session and mine metadata for the session.
        """
        super().__init__( intake_form=intake_form, invoking_class='SourceRFSession' ) # Call the __init__ method of the base class
        self._populate_df( metatables=metatables )
        self._check_session_validity( metatables=metatables )
        if self.is_valid:   self._mine_session_metadata() # necessary for publishing to xnat.


    def _populate_df( self, metatables: MetaTables ):
        self._init_rf_session_dataframe()
        all_ffns = self._all_dicom_ffns()
        self._df = self._df.reindex( np.arange( len( all_ffns ) ) )
        for idx, ffn in enumerate( all_ffns ):
            fn, ext = os.path.splitext( os.path.basename( ffn ) )
            if ext != '.dcm':
                self._df.loc[idx, ['FN', 'EXT', 'IS_VALID']] = [fn, ext, False]
                continue
            deid_dcm = SourceDicomDeIdentified( dcm_ffn=ffn, metatables=metatables, intake_form=self.intake_form )
            self._df.loc[idx, ['FN', 'EXT', 'OBJECT', 'IS_VALID']] = [fn, ext, deid_dcm, deid_dcm.is_valid]
            if deid_dcm.is_valid:
                dt_data = self._query_dicom_series_time_info( deid_dcm )
                self._df.loc[idx, ['DATE', 'INSTANCE_TIME', 'SERIES_TIME', 'INSTANCE_NUM']] = dt_data

        # Need to check within-case for duplicates -- apparently those do exist.
        hash_strs = set()
        for idx, row in self.df.iterrows():
            if row['IS_VALID']:
                if row['OBJECT'].image.hash_str in hash_strs:
                    self._df.at[idx, 'IS_VALID'] = False
                else:
                    hash_strs.add( row['OBJECT'].image.hash_str )
    

    def _check_session_validity( self, metatables: MetaTables ): # Invalid only when empty or all shots are invalid -- to-do: may also want to check that instance num and time are monotonically increasing
        print( self.df['IS_VALID'].any() )
        print( not metatables.item_exists( table_name='SUBJECTS', item_name=self.intake_form.uid ) )
        self._is_valid = self.df['IS_VALID'].any() and not metatables.item_exists( table_name='SUBJECTS', item_name=self.intake_form.uid )
        


    def _mine_session_metadata( self ):
        assert self.df.empty is False, 'Dataframe of dicom files is empty.'
        self._write_relevant_old_metadata_to_new_metadata()
        
        # For each row, generate a new file name now that we have a session label.
        for idx, row in self.df.iterrows():
            if row['IS_VALID']:
                # Generate a new file name for each shot in the session given its instance number, then overwrite metadata to ensure consistency throughout all shots.
                self._df.at[idx, 'NEW_FN'] = row['OBJECT'].generate_source_image_file_name( str( row['INSTANCE_NUM'] ), self.intake_form.uid )

        # self._derive_acquisition_site_info() # to-do: should warn the user that any mined info is inconsistent with their input
        self._df = self.df.sort_values( by='NEW_FN', inplace=False )


    # ---------------------------------_populate_df Helper Methods---------------------------------
    def _all_dicom_ffns( self ) -> list:
        # Filter out files with extensions other than .dcm
        all_ffns = [f for f in self.intake_form.relevant_folder.rglob("*") if f.suffix.lower() == '.dcm' or f.suffix == '']
        return [f for f in all_ffns if f.suffix.lower() == '.dcm' or f.suffix == '']

    def _init_rf_session_dataframe( self ):
        df_cols = { 'FN': 'str', 'EXT': 'str', 'NEW_FN': 'str', 'OBJECT': 'object', 'IS_VALID': 'bool',
                    'DATE': 'str', 'SERIES_TIME': 'str', 'INSTANCE_TIME': 'str', 'INSTANCE_NUM': 'str' }
        self._df = pd.DataFrame( {col: pd.Series( dtype=dt ) for col, dt in df_cols.items()} )

    def _query_dicom_series_time_info( self, deid_dcm: SourceDicomDeIdentified ) -> list:
        dt_data = [deid_dcm.datetime.date, deid_dcm.datetime.time, None, deid_dcm.metadata.InstanceNumber]
        if 'SeriesTime' in deid_dcm.metadata:    dt_data[2] = deid_dcm.metadata.SeriesTime
        if 'ContentTime' in deid_dcm.metadata:   dt_data[2] = deid_dcm.metadata.ContentTime
        if 'StudyTime' in deid_dcm.metadata:     dt_data[2] = deid_dcm.metadata.StudyTime
        return dt_data

    def _write_relevant_old_metadata_to_new_metadata( self ):
        '''Write all found metadata to its own field.'''
        for idx, row in self.df.iterrows():
            if row['IS_VALID']: # Iterate through all fields in the derived_metadata dictionary
                for key, value in row['OBJECT']._derived_metadata.items():
                    # Determine the appropriate VR based on the length of the value
                    if isinstance( value, str):
                        vr = 'LO' if len(value) <= 64 else 'LT'
                        # Add each key-value pair as a new private tag in the metadata
                        tag_number = 0x1000 + list(row['OBJECT']._derived_metadata.keys()).index(key) + 1
                        self._df.at[idx, 'OBJECT'].metadata.add_new((0x0019, tag_number), vr, f"{key}: {value}")
                print(self._df.at[idx, 'OBJECT'].metadata)

    # ---------------------------------_populate_df Helper Methods---------------------------------

    def __str__( self ) -> str:
        select_cols = ['NEW_FN', 'IS_VALID']
        df, intake_form = self.df[select_cols].copy(), self.intake_form
        if self.is_valid:
            return f' -- {self.__class__.__name__} --\nUID:\t{intake_form.uid}\nAcquisition Site:\t{intake_form.acquisition_site}\nGroup:\t\t\t{intake_form.group}\nDate-Time:\t\t{intake_form.datetime}\nValid:\t\t\t{self.is_valid}\n{df.head()}\n...\n{df.tail()}'
        else:
            return f' -- {self.__class__.__name__} --\nUID:\t{None}\nAcquisition Site:\t{intake_form.acquisition_site}\nGroup:\t\t\t{intake_form.group}\nDate-Time:\t\t{None}\nValid:\t\t\t{self.is_valid}\n{df.head()}\n...\n{df.tail()}'

    def write( self, metatables: MetaTables, verbose: Opt[bool] = False ) -> None:#Tuple[dict, MetaTables]:
        pass


pn = Path( r'R:\Anderson_Colaborations\AHRQ - 11287500\Fluoroscopy\Imported_Cases\DHS_20150624_1\DICOM' )
pn_intake = Path( r'C:\Users\dmattioli\OneDrive - University of Iowa\Downloads' )
tst = ORDataIntakeForm( metatables=metatables, login=validated_login, input_data=pn_intake, verbose=True )
src_rf = SourceRFSession( intake_form=tst, metatables=metatables )
print( src_rf )



...Processing OR Data Intake Form...

	...Initializing OR Intake From from "C:\Users\dmattioli\OneDrive - University of Iowa\Downloads\RECONSTRUCTED_OR_DATA_INTAKE_FORM.json"...
	--- Only minimally required fields were found in the inputted form.
	-- SUCCESS -- OR Data Intake Form saved to:	C:\Users\DMATTI~1\AppData\Local\Temp\XNAT_Interact\2_25_15887853616910988195665473909446825034\RECONSTRUCTED_OR_DATA_INTAKE_FORM.json

True
True
Dataset.file_meta -------------------------------
(0002, 0000) File Meta Information Group Length  UL: 212
(0002, 0001) File Meta Information Version       OB: b'\x00\x01'
(0002, 0002) Media Storage SOP Class UID         UI: X-Ray Radiofluoroscopic Image Storage
(0002, 0003) Media Storage SOP Instance UID      UI: 1.3.6.1.4.1.9590.100.1.2.145749889913277811811774629630441005588
(0002, 0010) Transfer Syntax UID                 UI: Explicit VR Little Endian
(0002, 0012) Implementation Class UID            UI: 1.3.6.1.4.1.9590.100.1.3.100.9.4
(0002, 0013) Imp

*** leaving off on 10/30/2024:
- need to continue w rfsession
    - need to pull from each image the original times, dates, and instance numbers
    - need to overwrite all dates -- the intake form provides that
    - need to overwrite the study and series times -- intake form should provide epic_start time
    - need to mine content time, cross reference/figure out what to do with incongruence instance numbers. lets not get too in the weeds here, let the analysts figure out errors
    - need to overwrite Study Instance UID and Series Instance UID to be the new uid, writing the old ones to their own thing.
    - need to provide text info indicating "old xxx"
    - need to redact "Study ID"
    - need to redact "Accession Number"
    - overwrite "Number of Study Related Instances" with the number of images found.
    - need to document all of this

### Structuring DHS Results

In [54]:
ffn = r'R:\Anderson_Colaborations\AHRQ - 11287500\Fluoroscopy\Imported_Cases\DHS_20140804_1\DICOM\DICOM_Results.json'

def parse_text_file(file_path):
    # Iterate over the lines and process each JSON object
    with open(file_path, 'r') as file:  lines = file.readlines()
    filenames, views, femoral_heads, femoral_necks, wires, sides, dicom_images = [], [], [], [], [], [], []
    for line in lines:
        line = line.strip()
        if line.startswith('{'):
            # Parse the JSON object
            data = json.loads(line)
            # Extract the required fields
            filenames.append(data.get('FileName', ''))
            views.append(data.get('View', ''))
            femoral_heads.append(data['Result'].get('Femoral_Head', ''))
            femoral_necks.append(data['Result'].get('Femoral_Neck', ''))
            wires.append(data['Result'].get('Wire', ''))
            sides.append(data.get('Side', ''))

            # dicom_file_path = Path(ffn).parent / f"{data.get('FileName', '')}.dcm"
            # try: # Read the DICOM file
            #     dicom_image = pydicom.dcmread(dicom_file_path).pixel_array
            #     dicom_images.append(dicom_image)
            # except Exception as e:
            #     dicom_images.append(None)
            #     print(f"Error reading DICOM file {dicom_file_path}: {e}")

    # Create a DataFrame with the extracted data
    df = pd.DataFrame({
        'filenames': filenames,
        'view': views,
        'femoral_head': femoral_heads,
        'femoral_neck': femoral_necks,
        'wire': wires,
        'side': sides#,
        # 'dicom_image': dicom_images
    })
    return df

df = parse_text_file(ffn)
print(df)

     filenames view                                       femoral_head  \
0   002_120821   AP  {'Left_XY': [476.88, 630.91], 'Top_XY': [668.1...   
1   008_120800   AP  {'Left_XY': [538.15, 491.7], 'Top_XY': [674.67...   
2   011_120834   AP  {'Left_XY': [539.28, 369.34], 'Top_XY': [694.5...   
3   038_130851   AP  {'Left_XY': [534.75, 309.29], 'Top_XY': [679.2...   
4   041_130833   AP  {'Left_XY': [541.55, 311.56], 'Top_XY': [688.8...   
5   047_130841   AP  {'Left_XY': [561.94, 309.86], 'Top_XY': [712.6...   
6   048_130827   AP  {'Left_XY': [567.61, 308.16], 'Top_XY': [708.6...   
7   050_130858   AP  {'Left_XY': [554.01, 311.56], 'Top_XY': [702.4...   
8   057_130822   AP  {'Left_XY': [566.47, 307.03], 'Top_XY': [705.8...   
9   068_130805   AP  {'Left_XY': [524.55, 314.39], 'Top_XY': [669.0...   
10  072_130812   AP  {'Left_XY': [528.64, 311.92], 'Top_XY': [687.6...   
11  093_130848   AP  {'Left_XY': [531.35, 322.89], 'Top_XY': [673.5...   
12  101_130824   AP  {'Left_XY': [488.

In [110]:
from pydicom.dataset import Dataset, FileDataset
from pydicom.uid import generate_uid, ExplicitVRLittleEndian
from pydicom.sequence import Sequence
from datetime import datetime
from pathlib import Path
import pydicom

def create_dicom_sr(features, accession_number, graphic_type, text_value, fn, series_num, instance_num, output_path):
    # Create a dummy evidence item with required attributes
    dummy_evidence = Dataset()
    dummy_evidence.PatientID = 'REDACTED' # Required
    dummy_evidence.PatientName = 'REDACTED' # Required
    dummy_evidence.PatientBirthDate = 'REDACTED' # Required
    dummy_evidence.PatientSex = 'O'  # Required
    dummy_evidence.AccessionNumber = accession_number  # Required
    dummy_evidence.StudyInstanceUID = generate_uid()
    dummy_evidence.SeriesInstanceUID = generate_uid()
    dummy_evidence.SOPInstanceUID = generate_uid()

    # Add each geometric feature as a content item
    content_items = []
    for feature in features:
        scoord_item = Dataset()
        scoord_item.ValueType = 'SCOORD' # SCOORD = Spatial Coordinates
        scoord_item.GraphicType = graphic_type # Possibles options: 'POINT', 'POLYLINE', 'MULTIPOINT', 'POLYGON', 'CIRCLE', 'ELLIPSE'
        # Flatten the list of points
        scoord_item.GraphicData = [coord for point in feature["points"] for coord in point]
        scoord_item.ConceptNameCodeSequence = Sequence([Dataset()])
        scoord_item.ConceptNameCodeSequence[0].CodeValue = 'T-D0050'
        scoord_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = 'SRT'
        scoord_item.ConceptNameCodeSequence[0].CodeMeaning = feature["label"]
        content_items.append(scoord_item)

    # Add a textual description explaining what the points correspond to
    text_item = Dataset()
    text_item.ValueType = 'TEXT'
    text_item.TextValue = text_value
    text_item.ConceptNameCodeSequence = Sequence([Dataset()])
    text_item.ConceptNameCodeSequence[0].CodeValue = '121071'
    text_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = 'DCM'
    text_item.ConceptNameCodeSequence[0].CodeMeaning = 'Textual description'
    content_items.append(text_item)

    # Add a container to provide context about the geometric features
    container_item = Dataset()
    container_item.ValueType = 'CONTAINER'
    container_item.ContinuityOfContent = 'SEPARATE'
    container_item.ConceptNameCodeSequence = Sequence([Dataset()])
    container_item.ConceptNameCodeSequence[0].CodeValue = 'T-D0050'
    container_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = 'SRT'
    container_item.ConceptNameCodeSequence[0].CodeMeaning = "Measurement"
    container_item.ContentSequence = Sequence(content_items)

    # Create the DICOM SR dataset
    file_meta = Dataset()
    file_meta.MediaStorageSOPClassUID = '1.2.840.10008.5.1.4.1.1.88.11'  # Comprehensive SR Storage
    file_meta.MediaStorageSOPInstanceUID = generate_uid()
    file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
    file_meta.ImplementationClassUID = generate_uid()

    ds = FileDataset(fn, {}, file_meta=file_meta, preamble=b"\0" * 128)
    ds.is_little_endian = True
    ds.is_implicit_VR = False

    # Populate the dataset with required attributes
    ds.PatientID = dummy_evidence.PatientID
    ds.PatientName = dummy_evidence.PatientName
    ds.PatientBirthDate = dummy_evidence.PatientBirthDate
    ds.PatientSex = dummy_evidence.PatientSex
    ds.StudyInstanceUID = dummy_evidence.StudyInstanceUID
    ds.SeriesInstanceUID = dummy_evidence.SeriesInstanceUID
    ds.SOPInstanceUID = dummy_evidence.SOPInstanceUID
    ds.Modality = 'SR' # "Structured Report"
    ds.SeriesNumber = series_num
    ds.InstanceNumber = instance_num
    ds.CompletionFlag = 'COMPLETE'
    ds.VerificationFlag = 'UNVERIFIED'
    ds.ContentDate = datetime.now().strftime('%Y%m%d')
    ds.ContentTime = datetime.now().strftime('%H%M%S')
    ds.ConceptNameCodeSequence = Sequence([Dataset()])
    ds.ConceptNameCodeSequence[0].CodeValue = 'P5-09051'
    ds.ConceptNameCodeSequence[0].CodingSchemeDesignator = 'SRT'
    ds.ConceptNameCodeSequence[0].CodeMeaning = 'Measurement'
    ds.ContentSequence = Sequence([container_item])

    # Save the SR to a DICOM file
    output_path = Path(output_path)
    ds.save_as(output_path / fn)

    print(f"DICOM SR saved to {output_path / fn}")

    # Load the saved dicom sr
    ds = pydicom.dcmread(output_path / fn)
    print(ds)

# Example usage
features = [
    {"points": [(30, 40)], "label": "TopXY"},
    {"points": [(10, 20)], "label": "LeftCY"},
    {"points": [(5, 5)], "label": "CenterXY"}
]
accession_number = 'REDACTED'
graphic_type = 'ELLIPSE'
text_value = 'The points correspond to the top, left, and center vertices from which the ellipse is derived.'
fn = 'test.dcm'
series_num = 1
instance_num = 1
output_path = r'C:\Users\dmattioli\OneDrive - University of Iowa\Downloads\tmp'


In [153]:
import pandas as pd
from pydicom.uid import generate_uid

# Assuming df is your DataFrame
# df = pd.read_csv('your_dataframe.csv')  # Example of loading a DataFrame

def create_dicom_sr(features, accession_number, graphic_type, text_value, fn, series_num, instance_num, output_path):
    from pydicom.dataset import Dataset, FileDataset
    from pydicom.sequence import Sequence
    from datetime import datetime
    from pathlib import Path
    import pydicom

    # Create a dummy evidence item with required attributes
    dummy_evidence = Dataset()
    dummy_evidence.PatientID = 'REDACTED' # Required
    dummy_evidence.PatientName = 'REDACTED' # Required
    dummy_evidence.PatientBirthDate = '19000101' # Required
    dummy_evidence.PatientSex = 'O'  # Required
    dummy_evidence.AccessionNumber = accession_number  # Required
    dummy_evidence.StudyInstanceUID = accession_number  # ***Should we do anything unique with these?***
    dummy_evidence.SeriesInstanceUID = accession_number # ***Should we do anything unique with these?***
    dummy_evidence.SOPInstanceUID = generate_uid()

    # Add each geometric feature as a content item
    content_items = []
    for feature, g_type, t_value in zip(features, graphic_type, text_value):
        scoord_item = Dataset()
        scoord_item.ValueType = 'SCOORD' # SCOORD = Spatial Coordinates
        scoord_item.GraphicType = g_type # Possibles options: 'POINT', 'POLYLINE', 'MULTIPOINT', 'POLYGON', 'CIRCLE', 'ELLIPSE'
        # Flatten the list of points and convert to float
        try:
            scoord_item.GraphicData = [float(coord) for point in feature["points"] for coord in point]
        except ValueError as e:
            print(f"Error converting points to float: {e}")
            print(f"Feature: {feature}")
            continue  # Skip this feature if there is an error
        scoord_item.ConceptNameCodeSequence = Sequence([Dataset()])
        scoord_item.ConceptNameCodeSequence[0].CodeValue = 'T-D0050'
        scoord_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = 'SRT'
        scoord_item.ConceptNameCodeSequence[0].CodeMeaning = feature["label"]
        content_items.append(scoord_item)

    # Add a textual description explaining what the points correspond to
    text_item = Dataset()
    text_item.ValueType = 'TEXT'
    text_item.TextValue = 'test'  # Assuming the first text value is the description
    text_item.ConceptNameCodeSequence = Sequence([Dataset()])
    text_item.ConceptNameCodeSequence[0].CodeValue = '121071'
    text_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = 'DCM'
    text_item.ConceptNameCodeSequence[0].CodeMeaning = 'Textual description'
    text_item.add_new(0x0040A160, 'UT', text_value )  # Use UT VR for unlimited text
    content_items.append(text_item)

    # Add a container to provide context about the geometric features
    container_item = Dataset()
    container_item.ValueType = 'CONTAINER'
    container_item.ContinuityOfContent = 'SEPARATE'
    container_item.ConceptNameCodeSequence = Sequence([Dataset()])
    container_item.ConceptNameCodeSequence[0].CodeValue = 'T-D0050'
    container_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = 'SRT'
    container_item.ConceptNameCodeSequence[0].CodeMeaning = "Measurement"
    container_item.ContentSequence = Sequence(content_items)

    # Create the DICOM SR dataset
    file_meta = Dataset()
    file_meta.MediaStorageSOPClassUID = '1.2.840.10008.5.1.4.1.1.88.11'  # Comprehensive SR Storage
    file_meta.MediaStorageSOPInstanceUID = generate_uid()
    file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
    file_meta.ImplementationClassUID = generate_uid()

    ds = FileDataset(fn, {}, file_meta=file_meta, preamble=b"\0" * 128)
    ds.is_little_endian = True
    ds.is_implicit_VR = False

    # Populate the dataset with required attributes
    ds.PatientID = dummy_evidence.PatientID
    ds.PatientName = dummy_evidence.PatientName
    ds.PatientBirthDate = dummy_evidence.PatientBirthDate
    ds.PatientSex = dummy_evidence.PatientSex
    ds.StudyInstanceUID = dummy_evidence.StudyInstanceUID
    ds.SeriesInstanceUID = dummy_evidence.SeriesInstanceUID
    ds.SOPInstanceUID = dummy_evidence.SOPInstanceUID
    ds.Modality = 'SR' # "Structured Report"
    ds.SeriesNumber = series_num
    ds.InstanceNumber = instance_num
    ds.CompletionFlag = 'COMPLETE'
    ds.VerificationFlag = 'UNVERIFIED'
    ds.ContentDate = datetime.now().strftime('%Y%m%d')
    ds.ContentTime = datetime.now().strftime('%H%M%S')
    ds.ConceptNameCodeSequence = Sequence([Dataset()])
    ds.ConceptNameCodeSequence[0].CodeValue = 'P5-09051'
    ds.ConceptNameCodeSequence[0].CodingSchemeDesignator = 'SRT'
    ds.ConceptNameCodeSequence[0].CodeMeaning = 'Measurement'
    ds.ContentSequence = Sequence([container_item])

    # Save the SR to a DICOM file
    output_path = Path(output_path)
    ds.save_as(output_path / fn)
    return ds


output_path = r'C:\Users\dmattioli\OneDrive - University of Iowa\Downloads\tmp'
accession_number = generate_uid()
output_path = Path(output_path) / str( accession_number ).replace('.', '_')
os.mkdir( output_path )
series_num = 1  # Use the row index + 1 as a unique integer for SeriesNumber
print( f'FolderName: {accession_number}' )
for index, row in df.iterrows():
    femoral_head_xy = [[float(coord) for coord in v] for v in row['femoral_head'].values()]
    
    # if row['wire']['XY'] is not empty
    if len(row['wire']['XY']) > 0: wire_info = [[float(coord) for coord in v] for v in row['wire']['XY']]
    else:                   wire_info = [[np.nan]]
    features = [
        {"points": femoral_head_xy, "label": "Femoral Head"},
        {"points": row['femoral_neck'], "label": "Femoral Neck"},
        {"points": wire_info, "label": "Wire"}
    ]
    graphic_type = ['ELLIPSE', 'LINE', 'LINE']
    text_value = ['3 defining points of ellipse; [leftx1 lefty1 topx1 topy1 centerx1 centery1]', 'Femoral neck bisector; [x1 y1 x2 y2]', 'Wire info; [entryx1 entryy1 tipx2 tipy2 pixelwidth mmwidth]']
    fn = row['filenames']
    instance_num = int(fn[:3])  # Convert the first 3 characters of the filename to an integer
    
    ds = create_dicom_sr(features, accession_number, graphic_type, text_value, fn, series_num, instance_num, output_path)

    print( f'\nFile: {fn} ')
    print( ds )

FolderName: 1.2.826.0.1.3680043.8.498.66902964531829963427179279246854874276

File: 002_120821 
Dataset.file_meta -------------------------------
(0002, 0002) Media Storage SOP Class UID         UI: Basic Text SR Storage
(0002, 0003) Media Storage SOP Instance UID      UI: 1.2.826.0.1.3680043.8.498.93295463020334009840666896674517758163
(0002, 0010) Transfer Syntax UID                 UI: Explicit VR Little Endian
(0002, 0012) Implementation Class UID            UI: 1.2.826.0.1.3680043.8.498.40619703683525919336390786452349124153
-------------------------------------------------
(0008, 0018) SOP Instance UID                    UI: 1.2.826.0.1.3680043.8.498.31824358247363462966997595342338193012
(0008, 0023) Content Date                        DA: '20241015'
(0008, 0033) Content Time                        TM: '135611'
(0008, 0060) Modality                            CS: 'SR'
(0010, 0010) Patient's Name                      PN: 'REDACTED'
(0010, 0020) Patient ID                         

### Mass upload

In [3]:
# Code for reading in mass upload form.
import pandas as pd
import os 
import warnings

# Suppress specific warning from openpyxl
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")

file_path = r'C:\Users\dmattioli\OneDrive - University of Iowa\Downloads\Mass_Upload_Template.xlsx'


In [45]:
from datetime import datetime
import ast
import json

date_string = "1999-31-12"  # Example date string in format YYYY-DD-MM
date_format = "%Y-%d-%m"
date_to_check = datetime.strptime(date_string, date_format)


required_column_doc = r'C:\Users\dmattioli\Projects\XNAT-Interact\doc\Mass_Upload_Doc-Required_Columns.json'
def process_mass_upload_form( ffn: Path, verbose: bool = False ) -> Tuple[ pd.DataFrame, pd.DataFrame ]:
    """
    Process the mass upload form to extract the relevant data.
    """
    # Extract the file, format the column headers
    df = pd.read_excel( ffn, header=0 )
    df.columns = [col.replace('\n', ' ').strip().replace(' ', '_').lower() for col in df.columns]
    
    # Read the JSON file to get the column descriptions
    with open(required_column_doc, 'r') as file:
        column_descriptions = json.load(file)
    required_columns = [col for col, desc in column_descriptions.items() if 'required' in desc.lower()]
    required_columns = [col.replace(' ', '_').lower() for col in required_columns]
    conditional_columns = [col for col, desc in column_descriptions.items() if 'conditional' in desc.lower()]
    conditional_columns = [col.replace(' ', '_').lower() for col in conditional_columns]
    optional_columns = [col for col, desc in column_descriptions.items() if 'optional' in desc.lower()]
    optional_columns = [col.replace(' ', '_').lower() for col in optional_columns]

    # Check if the DataFrame contains all the required column names
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:     raise ValueError(f"The following required columns are missing: {', '.join(missing_columns)}")
    
    # Retrieve registed items from metatables config.
    hawk_ids = [id.lower() for id in metatables.list_of_all_items_in_table( table_name='Registered_Users' )]
    institutions = [i.lower() for i in metatables.list_of_all_items_in_table( table_name='Acquisition_Sites' )]
    procedure_names = [p.lower() for p in metatables.list_of_all_items_in_table( table_name='Groups' )]
    surgeons = [s.lower() for s in metatables.list_of_all_items_in_table( table_name='Surgeons' )]
    surgeon_encodings = [metatables.get_uid( table_name='Surgeons', item_name=s ) for s in surgeons]
    print( surgeon_encodings)
    
    # Iterate through each row, performing custom checks on all the columns
    issues = {}
    for idx, row in df.iterrows(): 
        if not row['Filer_Hawk_ID'] or row['Filer_Hawk_ID'].lower() not in hawk_ids:                                        # Required
            issues.setdefault( idx, [] ).append( f"Error: Filer_Hawk_ID '{row['Filer_Hawk_ID']}' not found in Registered_Users." )
        if not row['Operation_Date'] or datetime.strptime( row['Operation_Date'], date_format ) < datetime( 2000, 1, 1 ):   # Required
            issues.setdefault( idx, [] ).append( f"Warning: Operation_Date '{row['Operation_Date']}' is before January 1, 2000; make sure this is intentional." )
        if row['Quality'] in ['', ' ']:                                                                                     # Optional
            issues.setdefault( idx, [] ).append( f"Warning: Quality is blank, converting to 'Unknown'." )
            row['Quality'] = 'Unknown'
        if not row['Institution'] or row['Institution'].lower() not in institutions:                                        # Required
            issues.setdefault( idx, [] ).append( f"Error: Institution '{row['Institution']}' not found in Acquisition_Sites." )
        if not row['Procedure_Name'] or row['Procedure_Name'].lower() not in procedure_names:                               # Required
            issues.setdefault( idx, [] ).append( f"Error: Procedure_Name '{row['Procedure_Name']}' not found in Groups." )
        if datetime.strptime( row['Epic_End_Time'], '%H:%M' ) < datetime.strptime( row['Epic_Start_Time'], '%H:%M' ):       # Optional
            issues.setdefault( idx, [] ).append( f"Error: Epic_End_Time '{row['Epic_End_Time']}' is before Epic_Start_Time '{row['Epic_Start_Time']}'." )
        if row['Supervising_Surgeon_Hawk_ID'] and row['Supervising_Surgeon'].lower() not in surgeons:                       # Optional
            issues.setdefault( idx, [] ).append( f"Error: Supervising_Surgeon '{row['Supervising_Surgeon']}' not found in Surgeons." )
            row['Supervising_Surgeon_Hawk_ID'] = metatables.get_uid( table_name='Surgeons', item_name=row['Supervising_Surgeon'] )
        if row['Supervising_Surgeon_Hawk_ID'] and not row['Supervising_Surgeon_Presence']:                                  # Conditional
            issues.setdefault( idx, [] ).append( f"Error: Supervising_Surgeon_Presence should not be blank if Supervising_Surgeon is specified." )
        if not row['Performing_Surgeon_Hawk_ID'] or row['Performing_Surgeon_Hawk_ID'].lower() not in hawk_ids:              # Required
            issues.setdefault( idx, [] ).append( f"Error: Performing_Surgeon_Hawk_ID '{row['Performing_Surgeon_Hawk_ID']}' not found in Registered_Users." )
            row['Performing_Surgeon_Hawk_ID'] = metatables.get_uid( table_name='Surgeons', item_name=row['Performing_Surgeon_Hawk_ID'] )
        if int( row['#_of_Participating_Performing_Surgeons'] ) > 1:
            # Structure of the Performer-HawkID_Task string must be a python dict and each key must be a valid HawkID.
            in_string = row['performer_hawkid_task']
            try:
                performer_hawkid_task = ast.literal_eval(row['performer_hawkid_task'])
                if not isinstance(performer_hawkid_task, dict):
                    issues.setdefault( idx, []).append( f"Error: Format of Performer-HawkID_Task must be in the following format '{{hawkid: task description, ..., hawkidN: task description N}}'.")
                for key in performer_hawkid_task.keys(): # Check if each key in the dict is a valid HawkID, and replace it with its encoding.
                    if key.lower() not in hawk_ids:
                        issues.setdefault(idx, []).append(f"Error: HawkID '{key}' in Performer-HawkID_Task is not found in Registered_Users.")
                    else:  # replace hawkid within the string with its encoding
                        performer_hawkid_task[ key ] = metatables.get_uid( table_name='Registered_Users', item_name=key )
                    
                    # Scan value text for any hawkids and create an error if any are found.

            except (ValueError, SyntaxError):
                issues.setdefault(idx, []).append(f"Error: Performer-HawkID_Task '{row['performer_hawkid_task']}' is not a valid Python dictionary.")
    
    return df

df = process_mass_upload_form( ffn=file_path, verbose=True )
# print( df.head())

['2_25_105373882644105819776871969948046613274', '2_25_153113018170584888948755921451119838249', '2_25_177605179432356784205179356109061276194', '2_25_320765609774914596692668931887446750776', '2_25_242756231263186740665400077691763092587', '2_25_47055081668334048644279774565120864693', '2_25_215985043920986609876260294586229627111', '2_25_262670161840697720830403676802732838312', '2_25_218272850888645267798384589845335857139', '2_25_127334654128346158182508967592879764035', '2_25_129749141168694875669322969513868282389', '2_25_112695354475414863217312730504962126307', '2_25_137379639105875891001305711420270768354', '2_25_234173235788339523460282711441599136090', '2_25_15341770874223398978416237652824520511', '2_25_120280789638296911636306918117949856915', '2_25_174428051917962190993648216004095686489', '2_25_2133594273542189444980757900887342384', '2_25_95150779983927200445036613252319196358', '2_25_132191392814164237665439919963654817336', '2_25_78073577460690605142165046421438538314

In [16]:
with Interface( server='https://rpacs.iibi.uiowa.edu/xnat', user='dmattioli', password='PooPoopoopoo123$' ) as xnat:
    proj_instance = xnat.select.project( 'GROK_AHRQ_main' )
    subj_instance = proj_instance.subject( '2_25_238253227114596188996505003310603781899' )
    subj_instance.delete()
    print( subj_instance.exists() )

False


In [106]:
import pandas as pd

def format_as_table(json_table):
    # Convert the JsonTable object to a list of dictionaries
    data = [item for item in json_table]
    
    # Create a DataFrame from the list of dictionaries
    return pd.DataFrame(data)
    

In [100]:
def print_preview_of_xnat_data( pd_table ):
    print( f'# Performances: {len(pd_table)}' )

    # Columns to display: procedure operation_date upload_date upload_time
    pd_copy = pd_table.copy()
    pd_copy.index = pd_copy.index + 1
    pd_copy['upload_time'] = pd_copy['upload_time'].apply(lambda x: x.strftime('%H:%M'))
    print( pd_copy[['procedure', 'operation_date', 'upload_date', 'upload_time']] )

In [105]:
# xnat:subjectData/GROUP
# xnat:experimentData/ACQUISITION_SITE
# xnat:experimentData/DATE

# user_prompt = input( 'Please select a procedure that you want to query:')
user_prompt = 'KNEE_ARTHROSCOPY-PRE_DIAGNOSTIC'
constraints =  [('xnat:subjectData/PROJECT', '=', 'GROK_AHRQ_Data'),
                'AND',
                ('xnat:subjectData/GROUP' , '=', user_prompt )
                ]


user_date_range = ['2021-11-01', '2024-12-31']

# Query all experiments --> note that we can specify esv v rf if we want. We could also query derived data. Not sure how to separate them if we query all at once without doing another query. Perpahs you do separate queries and the combine the tables.
constraints =  [('xnat:esvSessionData/PROJECT', '=', 'GROK_AHRQ_Data'),
                'OR',
                ('xnat:rfSessionData/PROJECT' , '=', 'GROK_AHRQ_Data' )]
with Interface( server='https://rpacs.iibi.uiowa.edu/xnat', user='dmattioli', password='PooPoopoopoo123$' ) as xnat:
    # Get all experiments.
    all_data = xnat.select('xnat:esvSessionData').where(constraints)
    all_data_pd = format_as_table( all_data )

    # Remove some dumb columns
    cols_to_remove = ['age', 'project']
    all_data_pd.drop( columns=cols_to_remove, inplace=True )

    # rename 'date' to 'operation_date'
    all_data_pd.rename(columns={'date': 'operation_date'}, inplace=True)


    # Perform query that retrieves the subject names
    new_constraints =  [('xnat:subjectData/PROJECT', '=', 'GROK_AHRQ_Data'), 'AND']
    sub_constraints = []
    subject_ids = all_data_pd['subject_id'].unique()
    for i, subject_id in enumerate(subject_ids):
        if i > 0:
            sub_constraints.append('OR')
        sub_constraints.append( ('xnat:subjectData/SUBJECT_ID', '=', subject_id) )
    new_constraints.append( sub_constraints )
    subj_qs = xnat.select('xnat:subjectData').where(new_constraints)
    subj_pd = format_as_table( subj_qs )
    cols_to_remove = ['gender_text', 'handedness_text', 'dob', 'educ', 'add_ids', 'race', 'ethnicity', 'invest_csv', 'ses', 'projects']
    subj_pd.drop( columns=cols_to_remove, inplace=True )

    # Append the following columns of subj_pd to all_data_pd: 'sub_group', 'xnat_col_subjectdatalabel'
    all_data_pd = all_data_pd.reset_index(drop=True)
    subj_pd = subj_pd.reset_index(drop=True)

    # Append the 'sub_group' and 'xnat_col_subjectdatalabel' columns from subj_pd to all_data_pd
    all_data_pd['procedure'] = subj_pd['sub_group']
    all_data_pd['subject_id'] = subj_pd['xnat_col_subjectdatalabel']
    # all_data_pd.drop( columns=['subject_id', 'expt_id'], inplace=True )

    # # Split insert_date into two columns
    all_data_pd['insert_date'] = pd.to_datetime(all_data_pd['insert_date'])
    all_data_pd['upload_date'] = all_data_pd['insert_date'].dt.date
    all_data_pd['upload_time'] = all_data_pd['insert_date'].dt.time
    all_data_pd.drop( columns=['insert_date'], inplace=True )
    print_preview_of_xnat_data( all_data_pd )

    # Ask the user if they want to filter by date
    user_date_filter = '1'
    if user_date_filter == '1':
        # Filter by date
        start_date = '2021-11-01'
        end_date = '2024-12-31'
        all_data_pd['operation_date'] = pd.to_datetime(all_data_pd['operation_date'])
        all_data_pd = all_data_pd[ ( all_data_pd['operation_date'] >= start_date ) & ( all_data_pd['operation_date'] <= end_date ) ]
        print_preview_of_xnat_data( all_data_pd )

    # ask the user if they want to filter by procedure
    user_procedure_filter = '1'
    if user_procedure_filter == '1':
        # Filter by procedure
        user_procedure = 'KNEE_ARTHROSCOPY-PRE_DIAGNOSTIC'
        all_data_pd = all_data_pd[ all_data_pd['procedure'] == user_procedure ]
        print_preview_of_xnat_data( all_data_pd )


<class 'pyxnat.core.jsonutil.JsonTable'>
<class 'pyxnat.core.jsonutil.JsonTable'>
# Performances: 24
                               procedure operation_date upload_date  \
1        KNEE_ARTHROSCOPY-PRE-DIAGNOSTIC     2021-07-14  2024-08-28   
2       KNEE_ARTHROSCOPY-POST_DIAGNOSTIC     2021-12-30  2024-08-28   
3        KNEE_ARTHROSCOPY-PRE-DIAGNOSTIC     2021-11-10  2024-08-28   
4       KNEE_ARTHROSCOPY-POST_DIAGNOSTIC     2021-02-22  2024-08-28   
5       KNEE_ARTHROSCOPY-POST_DIAGNOSTIC     2021-10-19  2024-08-28   
6        KNEE_ARTHROSCOPY-PRE-DIAGNOSTIC     2021-09-03  2024-08-28   
7   KNEE_ARTHROSCOPY-MENISCAL_TRANSPLANT     2021-09-07  2024-08-28   
8        KNEE_ARTHROSCOPY-PRE-DIAGNOSTIC     2023-11-21  2024-08-28   
9        KNEE_ARTHROSCOPY-PRE_DIAGNOSTIC     2024-04-03  2024-08-28   
10       KNEE_ARTHROSCOPY-PRE_DIAGNOSTIC     2024-04-17  2024-08-28   
11      KNEE_ARTHROSCOPY-POST_DIAGNOSTIC     2024-04-24  2024-08-28   
12       KNEE_ARTHROSCOPY-PRE_DIAGNOSTIC     20

### Query by experiment major (trauma v arthro) type

In [None]:
constraints =  [('xnat:esvSessionData/PROJECT', '=', 'GROK_AHRQ_Data')]
with Interface( server='https://rpacs.iibi.uiowa.edu/xnat', user='dmattioli', password='PooPoopoopoo123$' ) as xnat:
    # poo = xnat.select('xnat:experimentData' ).where(constraints)
    poo = xnat.select('xnat:esvSessionData').where(constraints)
    poopd = format_as_table( poo )
    print( poopd )

    # Construct a set of constraints that allow xnat:subjectData/Subject_ID to be in the list of subject ids
    new_constraints =  [('xnat:subjectData/PROJECT', '=', 'GROK_AHRQ_Data'),
                        'AND'
                        ]
    # Create a sublist resembling the following for each subject_id: ('xnat:subjectData/Subject_ID', '=', subject_id)
    sub_constraints = []
    for i, subject_id in enumerate(subject_ids):
        if i > 0:
            sub_constraints.append('OR')
        sub_constraints.append( ('xnat:subjectData/SUBJECT_ID', '=', subject_id) )
    new_constraints.append( sub_constraints )
    print( new_constraints)



    poo = xnat.select('xnat:subjectData').where(new_constraints)
    poopd = format_as_table( poo )

    
    # Remove superfluous columns from the table
    cols_to_remove = ['gender_text', 'handedness_text', 'dob', 'educ', 'add_ids', 'race', 'ethnicity', 'invest_csv', 'ses', 'projects']
    poopd.drop( columns=cols_to_remove, inplace=True )

    # Split the insert_date column into two columns
    poopd['insert_date'] = pd.to_datetime(poopd['insert_date'])
    poopd['upload_date'] = poopd['insert_date'].dt.date
    poopd['upload_time'] = poopd['insert_date'].dt.time
    poopd.drop( columns=['insert_date'], inplace=True )

    # Rename the remaining columns
    poopd.rename(columns={'sub_group': 'procedure', 'xnat_col_subjectdatalabel': 'subject_id'}, inplace=True)

    # Reorder the columns
    poopd = poopd[['subject_id', 'procedure', 'upload_date', 'upload_time', 'insert_user']]
    print( poopd )

In [None]:
constraints =  [('xnat:subjectData/PROJECT', '=', 'GROK_AHRQ_Data'),
                'AND',
                ('xnat:subjectData/GROUP' , '=', 'KNEE_ARTHROSCOPY-PRE_DIAGNOSTIC')
                ]
with Interface( server='https://rpacs.iibi.uiowa.edu/xnat', user='dmattioli', password='PooPoopoopoo123$' ) as xnat:
    poo = xnat.select('xnat:subjectData' ).where(constraints)
    poopd = format_as_table( poo )

##### Delete all subjects in server

In [None]:
def write_acceptable_keywords( metatables: MetaTables ):
    # walk through the 'GROUPS' and 'ACQUISITION_SITES' tables and write the Name for each item as a row in a text file that mimics a compact dataframe
    target_tables = ['GROUPS','ACQUISITION_SITES']
    out_ffn = os.path.join( metatables.doc_dir, 'acceptable_upload_keyword_inputs.txt')
    text_to_write = f'Acceptable Inputs -- Uploading a New Performance\n{"---"*20}\n\n'
    for t_name in target_tables:
        table_items_list = metatables.list_of_all_items_in_table( table_name=t_name )
        text_to_write += f'Key: {t_name}\n'
        for item in table_items_list:
            text_to_write += f'\t- {item}\n'
        text_to_write += '\n'
        
    with open( out_ffn, 'w' ) as f:
        f.write( text_to_write )
    # with open( f'{t_name}.txt', 'w' ) as f:
    #     f.write( f'{t_name}\n' )
    #     f.write( f'NAME\n' )

write_acceptable_keywords( metatables )
    