In [6]:
import polars as pl
import numpy as np
import pandas as pd
import os
import glob
import yaml

# View Unsorted Data

In [7]:
def print_parquet_sample(directory_path: str, sample_size: int = 5):
    parquet_file = None
    try:
        # Check if the directory exists
        if not os.path.isdir(directory_path):
            raise FileNotFoundError(f"Error: Directory not found at '{directory_path}'")

        # Find the first file with a .parquet extension
        for filename in os.listdir(directory_path):
            if filename.endswith(".parquet"):
                parquet_file = os.path.join(directory_path, filename)
                break  # Stop after finding the first one

        if not parquet_file:
            print(f"No .parquet files found in the directory: '{directory_path}'")
            return

        print(f"Reading sample from: {parquet_file}\n")

        # Read the Parquet file into a pandas DataFrame
        df = pd.read_parquet(parquet_file)

        # Print the first 'sample_size' rows of the DataFrame
        print("--- Sample of Parquet File ---")
        print(df.head(sample_size))
        print("-----------------------------\n")

    except ImportError:
        print("Error: The 'pandas' and 'pyarrow' (or 'fastparquet') libraries are required.")
        print("Please install them using: pip install pandas pyarrow")
    except FileNotFoundError as e:
        print(e)
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


In [12]:
unsorted_path = '/data/scratch/qc25022/pancreas/intermediate_unsorted/'
sorted_path = '/data/scratch/qc25022/pancreas/intermediate_sorted/'

In [13]:
print_parquet_sample(unsorted_path)

Reading sample from: /data/scratch/qc25022/pancreas/intermediate_unsorted/data.parquet

--- Sample of Parquet File ---
        e_patid        time  numunitid                         code  \
0  244323150658  2020-02-07       42.0         medcodeid//253914014   
1  244323150658  2020-02-07        NaN        medcodeid//1780301014   
2  244323150658  2020-02-07        NaN        medcodeid//1780300010   
3   46562350658  2021-06-14        NaN         medcodeid//253099017   
4  244323150658  2020-02-07        NaN  medcodeid//1572871000006117   

   numeric_value  
0           16.0  
1            NaN  
2            NaN  
3            NaN  
4            NaN  
-----------------------------



In [7]:
print_parquet_sample(sorted_path)

Reading sample from: /data/scratch/qc25022/pancreas/intermediate_sorted/136.parquet

--- Sample of Parquet File ---
     subject_id        time                        code  numeric_value  \
0  572672250535  2011-09-07        medcodeid//457965015           18.0   
1  572672250535  2011-09-07       medcodeid//1219389018            NaN   
2  572672250535  2011-09-07       medcodeid//1219389018            NaN   
3  572672250535  2011-09-07  medcodeid//714911000006118           32.9   
4  572672250535  2011-09-07       medcodeid//2478443019            4.7   

   numunitid  
0        NaN  
1        NaN  
2        NaN  
3        NaN  
4        NaN  
-----------------------------



# View Shard - whole

In [3]:
def view_parquet_shard(config_path: str, split_to_view: str, shard_number: int, num_rows: int = 10):
    """
    Reads a specific shard from the pipeline's output and prints its first N rows.

    Args:
        config_path: Path to the project's config.yaml file.
        split_to_view: The name of the split directory ('train', 'val', or 'test').
        shard_number: The number of the shard to view (e.g., 0 for 'shard_0.parquet').
        num_rows: The number of rows to print from the top of the file.
    """
    print(f"--- Viewing Shard {shard_number} from the '{split_to_view}' Split ---")

    # --- 1. Load Configuration to get the output directory ---
    try:
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        
        STUDY_PARAMS = config['study_params']
        cancer_type = STUDY_PARAMS['cancer_type']
        OUTPUTS = {key: val.format(cancer_type=cancer_type) for key, val in config.get('outputs', {}).items()}
        
        # Use the final mapped event stream directory
        output_base_dir = OUTPUTS.get('event_stream_dir')
        if not output_base_dir:
            print("Error: 'event_stream_dir' not found in config.yaml outputs.")
            return

    except FileNotFoundError:
        print(f"Error: Configuration file not found at '{config_path}'")
        return
    except Exception as e:
        print(f"An error occurred while reading the config file: {e}")
        return

    # --- 2. Construct the File Path ---
    split_map = {'train': 'train', 'val': 'tuning', 'test': 'held_out'}
    if split_to_view not in split_map:
        print(f"Error: Invalid split name '{split_to_view}'. Please use 'train', 'val', or 'test'.")
        return
        
    shard_filename = f"shard_{shard_number}.parquet"
    file_path = os.path.join(output_base_dir, split_map[split_to_view], shard_filename)

    # --- 3. Read and Print the Parquet File ---
    try:
        print(f"Reading file: {file_path}\n")
        
        # Read the parquet file into a Polars DataFrame
        df = pl.read_parquet(file_path)
        
        # Use a Polars context manager to ensure all rows/columns are displayed
        with pl.Config(tbl_rows=num_rows + 1, tbl_width_chars=200):
            print(df.head(num_rows))
            
    except FileNotFoundError:
        print(f"Error: File not found at '{file_path}'.")
        print("Please check that the path and shard number are correct.")
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        
    print("\n-------------------------------------------")

In [15]:
SPLIT = 'train'
    
# 2. The number of the shard file you want to view
SHARD_NUM = 0

# 3. How many rows you want to see
ROWS_TO_VIEW = 10

view_parquet_shard(
    config_path='/data/home/qc25022/cancer-extraction-pipeline/config.yaml',
    split_to_view=SPLIT,
    shard_number=SHARD_NUM,
    num_rows=ROWS_TO_VIEW
)

--- Viewing Shard 0 from the 'train' Split ---
Reading file: /data/scratch/qc25022/pancreas/event_streams/train/shard_0.parquet

shape: (10, 6)
┌────────────┬─────────────────────┬─────────────────────────────────┬───────────────┬────────────┬───────────┐
│ subject_id ┆ time                ┆ code                            ┆ numeric_value ┆ text_value ┆ numunitid │
│ ---        ┆ ---                 ┆ ---                             ┆ ---           ┆ ---        ┆ ---       │
│ i64        ┆ datetime[μs]        ┆ str                             ┆ f32           ┆ str        ┆ i64       │
╞════════════╪═════════════════════╪═════════════════════════════════╪═══════════════╪════════════╪═══════════╡
│ 1351284    ┆ 1944-01-01 00:00:00 ┆ MEDS_BIRTH                      ┆ null          ┆ null       ┆ null      │
│ 1351284    ┆ 2012-04-05 00:00:00 ┆ MEDICAL//non-smoker//397732011  ┆ null          ┆ null       ┆ null      │
│ 1351284    ┆ 2012-04-17 00:00:00 ┆ MEDICAL//H06..00//396090018     ┆ n

# View Shard- Patient Level

In [4]:
def view_patient_data_from_shards(patient_id, output_dir, splits_df):
    """
    Loads and prints the final processed data for a specific patient
    by searching for them within the correct output shard file.
    """
    print(f"\n--- Searching for Patient ID: {patient_id} in sharded output ---")
    
    # --- Step 1: Find the patient's split ---
    patient_split_info = splits_df.filter(pl.col("subject_id") == patient_id)
    
    if patient_split_info.is_empty():
        print(f"Error: Patient ID {patient_id} not found in the subject information file.")
        return
        
    split = patient_split_info.get_column("split")[0]
    
    if split not in SPLIT_MAP:
        print(f"Error: Unknown split '{split}' for patient {patient_id}.")
        return

    # --- Step 2: Search for the patient in the correct split directory ---
    subdir = SPLIT_MAP[split]
    search_dir = os.path.join(output_dir, subdir)
    
    # Use glob to find all shard files in the directory
    shard_files = glob.glob(os.path.join(search_dir, "shard_*.parquet"))
    
    if not shard_files:
        print(f"Error: No shard files found in directory '{search_dir}'.")
        return

    patient_found = False
    for shard_path in shard_files:
        try:
            # Lazily scan the shard file to efficiently check if the patient is in it
            shard_lf = pl.scan_parquet(shard_path)
            patient_data_lf = shard_lf.filter(pl.col('subject_id') == patient_id)
            
            # Collect the data. If the resulting DataFrame is not empty, we found the patient.
            patient_df = patient_data_lf.collect()
            
            if not patient_df.is_empty():
                print(f"Found patient in shard: {os.path.basename(shard_path)}")
                # Use a Polars context manager to ensure all rows are printed
                with pl.Config(tbl_rows=-1):
                    print(patient_df)
                patient_found = True
                break # Stop searching once the patient is found
        except Exception as e:
            # Continue to the next shard if there's an error reading one
            print(f"An error occurred while reading shard '{shard_path}': {e}")
            continue 

    if not patient_found:
        print(f"Error: Patient ID {patient_id} was not found in any shard in the '{split}' directory.")
    
    print("---------------------------------------------------\n")

In [8]:
final_output_dir = '/data/scratch/qc25022/pancreas/event_streams/'
all_subjects_df = pl.read_csv('/data/home/qc25022/cancer-extraction-pipeline/output/pancreas_study/subject_information.csv')
SPLITS_PATH = os.path.join('/data/home/qc25022/cancer-extraction-pipeline/output/pancreas_study/subject_information.csv')
OUTPUT_DIR = os.path.join('/data/scratch/qc25022/pancreas/', "intermediate_sorted")
SPLIT_MAP = {'train': 'train', 'val': 'tuning', 'test': 'held_out'}
splits = pl.read_csv(SPLITS_PATH)

view_patient_data_from_shards(362864450976, final_output_dir, all_subjects_df)
# 1351284
#786765851213
#953185950729
#362864450976
#214978451257
#475110050793
#1436299550143
#835661550624
#1141423451238


--- Searching for Patient ID: 362864450976 in sharded output ---
Found patient in shard: shard_15.parquet
shape: (276, 6)
┌──────────────┬──────────────┬───────────────────────────┬───────────────┬────────────┬───────────┐
│ subject_id   ┆ time         ┆ code                      ┆ numeric_value ┆ text_value ┆ numunitid │
│ ---          ┆ ---          ┆ ---                       ┆ ---           ┆ ---        ┆ ---       │
│ i64          ┆ datetime[μs] ┆ str                       ┆ f32           ┆ str        ┆ i64       │
╞══════════════╪══════════════╪═══════════════════════════╪═══════════════╪════════════╪═══════════╡
│ 362864450976 ┆ 1944-01-01   ┆ MEDS_BIRTH                ┆ null          ┆ null       ┆ null      │
│              ┆ 00:00:00     ┆                           ┆               ┆            ┆           │
│ 362864450976 ┆ 2004-10-22   ┆ MEDICAL//Coronary_Heart_D ┆ null          ┆ null       ┆ null      │
│              ┆ 00:00:00     ┆ iseas…                    ┆          

In [25]:
final_output_dir = '/data/scratch/qc25022/pancreas/final_cleaned_events/'
view_patient_data_from_shards(582909951294, final_output_dir, all_subjects_df)

#864974450800
	
#582909951294


--- Searching for Patient ID: 582909951294 in sharded output ---
Found patient in shard: shard_24.parquet
shape: (1_317, 5)
┌──────────────┬─────────────────────┬────────────────────────────────┬───────────────┬────────────┐
│ subject_id   ┆ time                ┆ code                           ┆ numeric_value ┆ text_value │
│ ---          ┆ ---                 ┆ ---                            ┆ ---           ┆ ---        │
│ i64          ┆ datetime[μs]        ┆ str                            ┆ f32           ┆ str        │
╞══════════════╪═════════════════════╪════════════════════════════════╪═══════════════╪════════════╡
│ 582909951294 ┆ 1926-01-01 00:00:00 ┆ MEDS_BIRTH                     ┆ null          ┆ null       │
│ 582909951294 ┆ 2007-06-15 00:00:00 ┆ MEDICAL//44D..00//1495289019   ┆ null          ┆ null       │
│ 582909951294 ┆ 2007-06-15 00:00:00 ┆ MEASUREMENT//425..00//17835210 ┆ 0.45          ┆ null       │
│              ┆                     ┆ …                           