Overview

This code defines a function identify_and_check_scans(root_directory) that scans through a given root directory, identifying and checking MRI scans. It searches for specific keywords in the files (PAR files) to identify different scan types and checks their sizes, comparing them with expected weight.

The tolerance is set to 3% (it could be easily adjusted), and a dictionary scan_keywords_and_sizes is defined to map scan types to specific keywords and expected file sizes for the corresponding PAR, NII, and REC files.

Two lists, maps_fine and maps_to_check_manually, are initialized to keep track of the maps that are fine and those that need manual checking.

In [2]:
import os


def identify_and_check_scans(root_directory):
    # Tolerance in percentage
    tolerance = 3.0

    # Keywords to search in PAR files and expected file sizes in bytes (PAR, NII, REC)
    scan_keywords_and_sizes = {
        "3D T1": ("PACS 3DT1", [51278, 18350432, 18350080]),
        "fMRI highres": ("fMRI hires", [114451, 8429920, 8429568]),
        "SDDT1": ("fMRI SDDT1 200", [2641255, 103949152, 103948800]),
        "SDDT2": ("fMRI SDDT2 200", [2652305, 104384352, 104384000]),
        "B0-map_RS": ("B0-map_RS", [30503, 9961824, 9961472]),
        "B0-map": ("B0-map", [30498, 9961824, 9961472]),
        "Resting State": ("rsfMRI 140", [2575278, 101350752, 101350400]),
        "SNAT1": ("SNAT1 149", [1781630, 70093152, 70092800]),
        "SNAT2": ("SNAT2 142", [1709480, 67251552, 67251200]),
        "SNAT3": ("SNAT3 141", [1697455, 66777952, 66777600]),
        "DTI A-P": ("jones30_P_NoCardiac", [756906, None, 76185600]),
        "DTI P-A": ("jones30_A_NoCardiac", [756906, None, 76185600])
    }

    # Keep track of maps that are fine and maps that need manual checking
    maps_fine = []
    maps_to_check_manually = []

    # Loop through all subdirectories (maps) in the root directory
    for directory_name in os.listdir(root_directory):
        directory_path = os.path.join(root_directory, directory_name)
        
        # Skip files, only process directories
        if not os.path.isdir(directory_path):
            continue

        print(f"Checking map: {directory_name}")

        identified_scans = {scan_type: [False, False, False] for scan_type in scan_keywords_and_sizes.keys()}
        identified_scans["DTI A-P"][1] = True
        identified_scans["DTI P-A"][1] = True
        abnormal_files = []

        for file_name in os.listdir(directory_path):
            if file_name.endswith(".PAR"):
                file_path = os.path.join(directory_path, file_name)
                with open(file_path, 'r') as file:
                    content = file.read()
                    for scan_type, (keyword, expected_sizes) in scan_keywords_and_sizes.items():
                        if keyword in content:
                            identified_scans[scan_type][0] = True
                            if os.path.exists(os.path.join(directory_path, file_name.replace(".PAR", ".nii"))) and expected_sizes[1]:
                                identified_scans[scan_type][1] = True
                            if os.path.exists(os.path.join(directory_path, file_name.replace(".PAR", ".REC"))):
                                identified_scans[scan_type][2] = True

                            nii_file_name = file_name.replace(".PAR", ".nii")
                            rec_file_name = file_name.replace(".PAR", ".REC")
                            
                            files_with_sizes = [
                                (file_name, os.path.getsize(os.path.join(directory_path, file_name))),
                                (nii_file_name, os.path.getsize(os.path.join(directory_path, nii_file_name))) if identified_scans[scan_type][1] and expected_sizes[1] else (None, None),
                                (rec_file_name, os.path.getsize(os.path.join(directory_path, rec_file_name))) if identified_scans[scan_type][2] else (None, None)
                            ]

                            identified_files = []
                            for (file_name, file_size), expected_size in zip(files_with_sizes, expected_sizes):
                                if expected_size is None or file_name is None:
                                    continue
                                lower_bound = expected_size * (1 - tolerance / 100)
                                upper_bound = expected_size * (1 + tolerance / 100)
                                if lower_bound <= file_size <= upper_bound:
                                    identified_files.append((file_name, file_size, "OK"))
                                else:
                                    identified_files.append((file_name, file_size, "Abnormal"))
                                    abnormal_files.append((file_name, file_size, expected_size))
                            identified_scans[scan_type].extend(identified_files)
                            break

        print("Identified scans:")
        for scan_type, scans in identified_scans.items():
            scans = scans[3:]
            print(scan_type + ":")
            for file_name, file_size, status in scans:
                if file_name:
                    print(f" - {file_name} (Weight: {file_size} bytes) - {status}")

        if abnormal_files:
            print("\nAbnormalities found:")
            for file_name, actual_size, expected_size in abnormal_files:
                print(f" - {file_name}\n   Actual Weight: {actual_size} bytes\n   Expected Weight: {expected_size} bytes")
        else:
            print("No abnormalities were found.")

        # Check for missing scans and formats
        print("\nMissing scans:")
        missing_found = False
        for scan_type, identified_formats in identified_scans.items():
            missing_formats = []
            if not identified_formats[0]: missing_formats.append("PAR")
            if not identified_formats[1] and scan_type not in ["DTI A-P", "DTI P-A"]: missing_formats.append("NII")
            if not identified_formats[2]: missing_formats.append("REC")
            if missing_formats:
                missing_found = True
                print(f" - {scan_type}")
                print(f"   Formats: {', '.join(missing_formats)}")

        if not missing_found:
            print("No scans are missing.")

        if abnormal_files or any(not all(identified_formats[:3]) for identified_formats in identified_scans.values()):
            maps_to_check_manually.append(directory_name)
        else:
            maps_fine.append(directory_name)

    print("\nOverview:")
    print("Maps that are fine:")
    for map_name in maps_fine:
        print(f" - {map_name}")

    print("\nMaps to check manually:")
    for map_name in maps_to_check_manually:
        print(f" - {map_name}")

#change this root_directory to the real one (this is just my trial directory)
root_directory = "my_directory" 
identify_and_check_scans(root_directory)


Checking map: SU37017401
Identified scans:
3D T1:
 - 37017401_2_1.PAR (Weight: 51276 bytes) - OK
 - 37017401_2_1.nii (Weight: 18350432 bytes) - OK
 - 37017401_2_1.REC (Weight: 18350080 bytes) - OK
fMRI highres:
 - 37017401_3_1.PAR (Weight: 114449 bytes) - OK
 - 37017401_3_1.nii (Weight: 8429920 bytes) - OK
 - 37017401_3_1.REC (Weight: 8429568 bytes) - OK
SDDT1:
 - 37017401_8_1.PAR (Weight: 2621754 bytes) - OK
 - 37017401_8_1.nii (Weight: 103181152 bytes) - OK
 - 37017401_8_1.REC (Weight: 103180800 bytes) - OK
SDDT2:
 - 37017401_9_1.PAR (Weight: 2633454 bytes) - OK
 - 37017401_9_1.nii (Weight: 103641952 bytes) - OK
 - 37017401_9_1.REC (Weight: 103641600 bytes) - OK
B0-map_RS:
 - 37017401_13_1.PAR (Weight: 30501 bytes) - OK
 - 37017401_13_1.nii (Weight: 9961824 bytes) - OK
 - 37017401_13_1.REC (Weight: 9961472 bytes) - OK
B0-map:
 - 37017401_4_1.PAR (Weight: 30497 bytes) - OK
 - 37017401_4_1.nii (Weight: 9961824 bytes) - OK
 - 37017401_4_1.REC (Weight: 9961472 bytes) - OK
Resting State:


Identified scans:
3D T1:
 - SU37110501_2_1.PAR (Weight: 51282 bytes) - OK
 - SU37110501_2_1.nii (Weight: 18350432 bytes) - OK
 - SU37110501_2_1.REC (Weight: 18350080 bytes) - OK
fMRI highres:
 - SU37110501_3_1.PAR (Weight: 114455 bytes) - OK
 - SU37110501_3_1.nii (Weight: 8429920 bytes) - OK
 - SU37110501_3_1.REC (Weight: 8429568 bytes) - OK
SDDT1:
 - SU37110501_8_1.PAR (Weight: 2681884 bytes) - OK
 - SU37110501_8_1.nii (Weight: 105549152 bytes) - OK
 - SU37110501_8_1.REC (Weight: 105548800 bytes) - OK
SDDT2:
 - SU37110501_9_1.PAR (Weight: 2642234 bytes) - OK
 - SU37110501_9_1.nii (Weight: 103987552 bytes) - OK
 - SU37110501_9_1.REC (Weight: 103987200 bytes) - OK
B0-map_RS:
 - SU37110501_13_1.PAR (Weight: 30507 bytes) - OK
 - SU37110501_13_1.nii (Weight: 9961824 bytes) - OK
 - SU37110501_13_1.REC (Weight: 9961472 bytes) - OK
B0-map:
 - SU37110501_4_1.PAR (Weight: 30502 bytes) - OK
 - SU37110501_4_1.nii (Weight: 9961824 bytes) - OK
 - SU37110501_4_1.REC (Weight: 9961472 bytes) - OK
Rest

Transfer of participant's maps to this directory: J:/Workgroups/FSW/Zwaartekracht/Data_C3_(MCC)/ses-w07-lab/(f)mri-scans

In [2]:
import os
import shutil

# Source and destination directories
source_directory = "J:/Workgroups/FSW/Zwaartekracht/Data_C3_(MCC)/ses-w07-lab/(f)mri-scans/export_lab-visit" 
destination_directory = "J:/Workgroups/FSW/Zwaartekracht/Data_C3_(MCC)/ses-w07-lab/(f)mri-scans" 

# List to hold selected directories
selected_directories = []

# Iterate through the items in the source directory and filter based on directories
for item_name in os.listdir(source_directory):
    item_path = os.path.join(source_directory, item_name)
    if os.path.isdir(item_path):
        selected_directories.append(item_path)

# Print the selected directories
print("Selected directories:")
for directory in selected_directories:
    print(f"  - {os.path.basename(directory)}")

# Transfer directories to destination directory
for directory_path in selected_directories:
    shutil.move(directory_path, os.path.join(destination_directory, os.path.basename(directory_path)))

# Display final overview
print("Final Overview:")
print(f"Number of directories transferred: {len(selected_directories)}")
print("List of them:")
for directory in selected_directories:
    print(f"  - {os.path.basename(directory)}")


Selected directories:
  - SUTEST01
  - SUTEST01 - Copy
  - SUTEST01 - Copy (2)
Final Overview:
Number of directories transferred: 3
List of them:
  - SUTEST01
  - SUTEST01 - Copy
  - SUTEST01 - Copy (2)
