In [1]:
import os
from pathlib import Path
import pandas as pd
import json
import re
from collections import defaultdict

In [2]:
# =========== CONFIGURE PATHS (use YOUR confirmed paths) ===========
DATA_PATHS = {
    'radiology': r"E:\CancerVision_Data\data\radiology\TCGA-LUAD",
    # pathology is explicitly a list of shards 
    'pathology': [
        r"E:\CancerVision_Data\data\pathology\pathology_1",
        r"E:\CancerVision_Data\data\pathology\pathology_2",
        r"E:\CancerVision_Data\data\pathology\pathology_3",
        r"F:\FYP_Preparation\Data\pathalogy\pathology_4",
    ],
    'genomics_mutations': r"E:\CancerVision_Data\data\Genomics\Mutations\mutations",
    'genomics_rnaseq': r"E:\CancerVision_Data\data\Genomics\RNASeq\RNAseq",
    'clinical': r"E:\CancerVision_Data\data\Clinical\Clinical_data",
}

OUTPUT_DIR = Path("inventory_output")
OUTPUT_DIR.mkdir(exist_ok=True)

In [3]:

# =========== HELPERS ===========
tcga_regex = re.compile(r"(TCGA-[A-Za-z0-9]{2}-[A-Za-z0-9]{4})", re.IGNORECASE)
uuid_regex = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.IGNORECASE)

def size_mb(path: Path):
    try:
        return path.stat().st_size / (1024*1024)
    except Exception:
        return None

def find_files(root: Path, exts=None):
    """Recursively list files under root. exts = list e.g. ['.svs', '.dcm'] or None for all."""
    if not root.exists():
        return []
    if exts:
        exts_l = [e.lower() for e in exts]
        return [p for p in root.rglob("*") if p.is_file() and p.suffix.lower() in exts_l]
    else:
        return [p for p in root.rglob("*") if p.is_file()]

def extract_tcga_from_path(path_str: str):
    m = tcga_regex.search(path_str)
    return m.group(1) if m else None

def extract_uuid_from_path(path_str: str):
    # check each path component
    for comp in Path(path_str).parts:
        if uuid_regex.match(comp):
            return comp
    return None

def find_sample_sheets(folder: Path):
    """Search for sample-sheet-like files in folder (xlsx/csv/tsv/xls)."""
    candidates = []
    if not folder.exists():
        return candidates
    for ext in ('.xlsx', '.xls', '.csv', '.tsv', '.txt'):
        # common patterns
        for p in folder.rglob(f"*{ext}"):
            name_lower = p.name.lower()
            if 'sample' in name_lower or 'sheet' in name_lower or 'manifest' in name_lower or 'file_id' in name_lower:
                candidates.append(p)
    return candidates

def load_sheet(path: Path):
    try:
        if path.suffix.lower() in ['.xlsx', '.xls']:
            return pd.read_excel(path, engine='openpyxl')
        elif path.suffix.lower() == '.csv':
            return pd.read_csv(path)
        elif path.suffix.lower() in ['.tsv', '.txt']:
            return pd.read_csv(path, sep='\t')
    except Exception as e:
        print(f"  ! Failed to read sample sheet {path}: {e}")
    return None


In [4]:
# =========== PHASE A: PHYSICAL INVENTORY ===========
inventory = defaultdict(list)

# Radiology: list DICOM-like files (common extension .dcm) and also capture series dirs
rad_root = Path(DATA_PATHS['radiology'])
if rad_root.exists():
    print("Scanning Radiology (this may take a while)...")
    # We'll capture .dcm files if present, otherwise capture all files under series dirs
    dcm_files = find_files(rad_root, exts=['.dcm'])
    if not dcm_files:
        # fallback: collect all files, but still record parent directories and counts
        all_files = find_files(rad_root, exts=None)
        for f in all_files:
            patient_tcga = extract_tcga_from_path(str(f))
            inventory['radiology'].append({
                'file_path': str(f),
                'file_size_mb': size_mb(f),
                'patient_tcga': patient_tcga,
                'parent_series_dir': str(f.parent),
                'ext': f.suffix.lower()
            })
    else:
        for f in dcm_files:
            patient_tcga = extract_tcga_from_path(str(f))
            inventory['radiology'].append({
                'file_path': str(f),
                'file_size_mb': size_mb(f),
                'patient_tcga': patient_tcga,
                'parent_series_dir': str(f.parent),
                'ext': f.suffix.lower()
            })
    print(f"  Radiology files found: {len(inventory['radiology'])}")
else:
    print(f"Radiology root not found: {rad_root}")

Scanning Radiology (this may take a while)...
  Radiology files found: 48931


In [5]:
# Pathology: search .svs files across multiple shards
print("Scanning Pathology shards...")
for shard in DATA_PATHS['pathology']:
    shard_p = Path(shard)
    if not shard_p.exists():
        print(f"  Pathology shard missing: {shard}")
        continue
    svs_files = find_files(shard_p, exts=['.svs'])
    for f in svs_files:
        fn = f.name
        tcga = extract_tcga_from_path(fn) or extract_tcga_from_path(str(f.parent)) or None
        # try to capture UUID from filename as fallback
        uuid = None
        uuid_m = re.search(r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})', fn)
        if uuid_m:
            uuid = uuid_m.group(1)
        inventory['pathology'].append({
            'file_path': str(f),
            'file_size_mb': size_mb(f),
            'tcga': tcga,
            'uuid': uuid,
            'shard': shard,
            'ext': f.suffix.lower()
        })
    print(f"  Shard {shard}: {len(svs_files)} .svs files")


Scanning Pathology shards...
  Shard E:\CancerVision_Data\data\pathology\pathology_1: 40 .svs files
  Shard E:\CancerVision_Data\data\pathology\pathology_2: 36 .svs files
  Shard E:\CancerVision_Data\data\pathology\pathology_3: 43 .svs files
  Shard F:\FYP_Preparation\Data\pathalogy\pathology_4: 33 .svs files


In [6]:
# Genomics - mutations (list files and zipped MAFs)
mut_root = Path(DATA_PATHS['genomics_mutations'])
if mut_root.exists():
    maf_like = find_files(mut_root, exts=['.maf', '.gz', '.txt', '.maf.gz'])
    # include zip/archives as well (user said zip per folder)
    for f in maf_like:
        inventory['genomics_mutations'].append({
            'file_path': str(f),
            'file_size_mb': size_mb(f),
            'parent_uuid': extract_uuid_from_path(str(f.parent)),
            'ext': f.suffix.lower()
        })
    print(f"  Mutations files found: {len(inventory['genomics_mutations'])}")
else:
    print(f"Mutations root not found: {mut_root}")

# Genomics - rnaseq
rn_root = Path(DATA_PATHS['genomics_rnaseq'])
if rn_root.exists():
    rn_files = find_files(rn_root, exts=['.tsv', '.txt', '.gz', '.csv'])
    for f in rn_files:
        inventory['genomics_rnaseq'].append({
            'file_path': str(f),
            'file_size_mb': size_mb(f),
            'parent_uuid': extract_uuid_from_path(str(f.parent)),
            'ext': f.suffix.lower()
        })
    print(f"  RNA-Seq files found: {len(inventory['genomics_rnaseq'])}")
else:
    print(f"RNA-Seq root not found: {rn_root}")


  Mutations files found: 1239
  RNA-Seq files found: 692


In [7]:
# Clinical
cli_root = Path(DATA_PATHS['clinical'])
if cli_root.exists():
    cli_files = find_files(cli_root, exts=['.json', '.xml', '.tsv', '.txt', '.html', '.htm'])
    for f in cli_files:
        inventory['clinical'].append({
            'file_path': str(f),
            'file_size_mb': size_mb(f),
            'parent_uuid': extract_uuid_from_path(str(f.parent)),
            'ext': f.suffix.lower(),
            'name': f.name
        })
    print(f"  Clinical files found: {len(inventory['clinical'])}")
else:
    print(f"Clinical root not found: {cli_root}")

# Save physical inventory CSVs (phase A outputs)
for modality, files in inventory.items():
    df = pd.DataFrame(files)
    outp = OUTPUT_DIR / f"{modality}_files.csv"
    df.to_csv(outp, index=False)
    print(f"Saved {modality} listing: {outp} (rows: {len(df)})")

  Clinical files found: 771
Saved radiology listing: inventory_output\radiology_files.csv (rows: 48931)
Saved pathology listing: inventory_output\pathology_files.csv (rows: 152)
Saved genomics_mutations listing: inventory_output\genomics_mutations_files.csv (rows: 1239)
Saved genomics_rnaseq listing: inventory_output\genomics_rnaseq_files.csv (rows: 692)
Saved clinical listing: inventory_output\clinical_files.csv (rows: 771)


=========== PHASE B: SAMPLE-SHEET MAPPING (semantic mapping) ===========
Strategy:
1) Search for sample sheets in clinical and genomics folders
2) Load sample sheets and try to find standardized columns: File ID, File Name, Case ID, Sample ID
3) Build mapping: file_id -> case_id (and sample_id)
4) Use mapping to annotate inventory rows where possible


In [11]:
import pandas as pd
from pathlib import Path

# 1. Define the base path from your screenshot
base_path = r"E:\CancerVision_Data\data\Clinical\gdc_sample_sheet.2025-11-07"

# Check both the name you see and the likely actual name with .tsv
if Path(base_path).exists():
    SPECIFIC_SAMPLE_SHEET = Path(base_path)
elif Path(base_path + ".tsv").exists():
    SPECIFIC_SAMPLE_SHEET = Path(base_path + ".tsv")
else:
    SPECIFIC_SAMPLE_SHEET = None

maps = {}  
mapping_rows = []

if SPECIFIC_SAMPLE_SHEET:
    print(f"‚úÖ Found file at: {SPECIFIC_SAMPLE_SHEET}")
    try:
        # Based on your properties screenshot, this is a TSV file
        df = pd.read_csv(SPECIFIC_SAMPLE_SHEET, sep='\t')
        
        # Clean column names
        df.columns = [c.strip() for c in df.columns]
        
        # Match the columns seen in your Excel screenshot
        file_id_col = 'File ID'
        case_id_col = 'Case ID'
        
        if file_id_col in df.columns and case_id_col in df.columns:
            for _, r in df.iterrows():
                fid = str(r[file_id_col]).strip()
                cid = str(r[case_id_col]).strip()
                
                if fid and fid.lower() != 'nan' and cid and cid.lower() != 'nan':
                    maps[fid] = cid
                    mapping_rows.append({'file_id': fid, 'case_id': cid})
            
            print(f"Successfully mapped {len(maps)} File IDs to Case IDs.")
        else:
            print(f"‚ùå Column mismatch. Available columns: {list(df.columns)}")

    except Exception as e:
        print(f"‚ùå Error loading file: {e}")
else:
    print(f"‚ùå CRITICAL ERROR: File not found.")
    print(f"Looked for: {base_path}")
    print(f"And: {base_path}.tsv")

# Save the result
if mapping_rows:
    mapping_df = pd.DataFrame(mapping_rows)
    mapping_df.to_csv(OUTPUT_DIR / "extracted_fileid_caseid_mapping.csv", index=False)

‚úÖ Found file at: E:\CancerVision_Data\data\Clinical\gdc_sample_sheet.2025-11-07.tsv
Successfully mapped 623 File IDs to Case IDs.


In [21]:
import pandas as pd
import re
from collections import defaultdict

mapping = pd.read_csv("inventory_output/extracted_fileid_caseid_mapping.csv")
fileid_to_case = dict(zip(mapping.file_id, mapping.case_id))

def detect_genomics_presence(genomics_df):
    case_presence = defaultdict(int)
    for path in genomics_df['file_path']:
        for file_id in fileid_to_case:
            if file_id in path:
                case_presence[fileid_to_case[file_id]] = 1
                break
    return case_presence

mut_df = pd.read_csv("inventory_output/genomics_mutations_files.csv")
rna_df = pd.read_csv("inventory_output/genomics_rnaseq_files.csv")

mut_presence = detect_genomics_presence(mut_df)
rna_presence = detect_genomics_presence(rna_df)


In [22]:
# =========== ANNOTATE INVENTORY USING MAPPING ===========
def annotate_and_map(inventory, maps):
    """Create manifest rows by attempting multiple mapping heuristics."""
    # Build reverse map from case_id -> aggregated info
    case_info = defaultdict(lambda: {'radiology_files': [], 'pathology_files': [], 'genomics_mutations': [], 'genomics_rnaseq': [], 'clinical_files': []})
    # 1) Radiology: use patient_tcga extracted from path
    for row in inventory.get('radiology', []):
        tcga = row.get('patient_tcga')
        if tcga:
            case_id = tcga
            case_info[case_id]['radiology_files'].append(row['file_path'])
        else:
            # radiology without tcga -> try to find parent folder that matches TCGA
            possible = extract_tcga_from_path(row.get('file_path',''))
            if possible:
                case_info[possible]['radiology_files'].append(row['file_path'])
            else:
                case_info['UNMAPPED_RADIOLGY'].setdefault('files', []).append(row['file_path'])

    # 2) Pathology: try to extract TCGA from filename or parent path, else try to map from mapping via folder UUIDs
    for row in inventory.get('pathology', []):
        tcga = row.get('tcga')
        uuid = row.get('uuid')
        if tcga:
            case_info[tcga]['pathology_files'].append(row['file_path'])
        elif uuid and maps.get(uuid):
            case_info[maps[uuid]]['pathology_files'].append(row['file_path'])
        else:
            # try to see if the parent folder name is a UUID and maps
            parent_uuid = extract_uuid_from_path(str(Path(row['file_path']).parent))
            if parent_uuid and maps.get(parent_uuid):
                case_info[maps[parent_uuid]]['pathology_files'].append(row['file_path'])
            else:
                case_info['UNMAPPED_PATHOLOGY'].setdefault('files', []).append(row['file_path'])

    # 3) Genomics mutations / rnaseq -> use parent_uuid mapping (folder names) or filename prefix
    for modality in ['genomics_mutations', 'genomics_rnaseq']:
        for row in inventory.get(modality, []):
            parent_uuid = row.get('parent_uuid')
            mapped = None
            if parent_uuid and maps.get(parent_uuid):
                mapped = maps[parent_uuid]
            else:
                # try to parse file name for uuid prefix
                m = re.search(r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})', row.get('file_path',''))
                if m and maps.get(m.group(1)):
                    mapped = maps[m.group(1)]
            if mapped:
                case_info[mapped][modality].append(row['file_path'])
            else:
                case_info[f"UNMAPPED_{modality.upper()}"].setdefault('files', []).append(row['file_path'])

    # 4) Clinical: try to map by parent_uuid or by file name containing TCGA
    for row in inventory.get('clinical', []):
        # check name for TCGA token
        name = row.get('name','')
        found_tcga_in_name = extract_tcga_from_path(name)
        parent_uuid = row.get('parent_uuid')
        if found_tcga_in_name:
            case_info[found_tcga_in_name]['clinical_files'].append(row['file_path'])
        elif parent_uuid and maps.get(parent_uuid):
            case_info[maps[parent_uuid]]['clinical_files'].append(row['file_path'])
        else:
            # try to locate TCGA in file content? (deferred) --> put in unmapped
            case_info['UNMAPPED_CLINICAL'].setdefault('files', []).append(row['file_path'])

    return case_info

In [23]:
case_info = annotate_and_map(inventory, maps)

In [26]:
import json
import pandas as pd
import os

# 1. Convert case_info into a manifest dataframe
manifest_rows = []
for case_id, info in case_info.items():
    if case_id.startswith('UNMAPPED_'):
        continue  # skip unmapped aggregates in manifest
        
    manifest_rows.append({
        'case_id': case_id,
        'radiology_count': len(info.get('radiology_files', [])),
        'pathology_count': len(info.get('pathology_files', [])),
        'genomics_mutations_count': len(info.get('genomics_mutations', [])),
        'genomics_rnaseq_count': len(info.get('genomics_rnaseq', [])),
        'clinical_count': len(info.get('clinical_files', [])),
        'radiology_paths': json.dumps(info.get('radiology_files', [])[:10]),
        'pathology_paths': json.dumps(info.get('pathology_files', [])[:10]),
        'genomics_mutations_paths': json.dumps(info.get('genomics_mutations', [])[:10]),
        'genomics_rnaseq_paths': json.dumps(info.get('genomics_rnaseq', [])[:10]),
        'clinical_paths': json.dumps(info.get('clinical_files', [])[:10]),
    })

# 2. Create DataFrame
manifest_df = pd.DataFrame(manifest_rows)
manifest_path = OUTPUT_DIR / "patient_manifest.csv"

# 3. Handle Permission/Locking Logic
try:
    # Attempt to save the file
    manifest_df.to_csv(manifest_path, index=False)
    print(f"‚úÖ Patient manifest successfully saved to: {manifest_path}")

except PermissionError:
    print(f"\n‚ùå ERROR: Could not save the file because it is locked.")
    print(f"üëâ ACTION REQUIRED: Please close '{manifest_path.name}' if it is open in Excel or another app, then run this cell again.")
except Exception as e:
    print(f"‚ùå An unexpected error occurred: {e}")

# 4. Display Results
if manifest_path.exists():
    print("\n--- Preview: Top 20 Rows ---")
    print(manifest_df.head(20).to_string(index=False))

    print("\n--- Data Summary ---")
    # Using .info() for structural summary and .describe() for stats
    print(manifest_df.info())
    print("\n--- Numerical Statistics ---")
    print(manifest_df.describe())

‚úÖ Patient manifest successfully saved to: inventory_output\patient_manifest.csv

--- Preview: Top 20 Rows ---
     case_id  radiology_count  pathology_count  genomics_mutations_count  genomics_rnaseq_count  clinical_count                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         

In [16]:
# Save unmapped lists for manual inspection
unmapped = {k: v.get('files',[]) for k,v in case_info.items() if k.startswith('UNMAPPED_')}
with open(OUTPUT_DIR / "unmapped_files_summary.json", 'w') as f:
    json.dump(unmapped, f, indent=2)
print("Saved unmapped files summary for manual review:", OUTPUT_DIR / "unmapped_files_summary.json")

# Save full detailed inventory as JSON
with open(OUTPUT_DIR / "detailed_inventory.json", 'w') as f:
    json.dump(inventory, f, indent=2, default=str)
print("Saved detailed inventory JSON.")

print("\nDONE. Inspect outputs in", OUTPUT_DIR)

Saved unmapped files summary for manual review: inventory_output\unmapped_files_summary.json
Saved detailed inventory JSON.

DONE. Inspect outputs in inventory_output


In [20]:
import pandas as pd
import re
from collections import defaultdict

mapping = pd.read_csv("inventory_output/extracted_fileid_caseid_mapping.csv")
fileid_to_case = dict(zip(mapping.file_id, mapping.case_id))

def detect_genomics_presence(genomics_df):
    case_presence = defaultdict(int)
    for path in genomics_df['file_path']:
        for file_id in fileid_to_case:
            if file_id in path:
                case_presence[fileid_to_case[file_id]] = 1
                break
    return case_presence

mut_df = pd.read_csv("inventory_output/genomics_mutations_files.csv")
rna_df = pd.read_csv("inventory_output/genomics_rnaseq_files.csv")

mut_presence = detect_genomics_presence(mut_df)
rna_presence = detect_genomics_presence(rna_df)
