# Radiology Data Quality Assessment Through the Five Facets Framework

**Assignment Part II — Data Quality Analysis**

In this assignment, you will:
1. Learn about the **Five Facets of Data Quality Assessment** framework
2. Explore a synthetic radiology event log with intentional quality issues
3. Work within the context of three research questions (RQs)
4. **Choose and run** DQ assessment methods from a provided toolkit
5. Analyse results through the lens of the five facets
6. Manually explore the data to identify additional quality dimensions
7. Reflect on the limitations of automated assessment and event logs

**Important notes:**
- You will write ~20-25% code (basic exploration, running analyses, creating visualisations)
- Focus is on **choosing appropriate methods**, **interpreting results**, and **critical analysis**
- **All numbered "Reflection Questions" should be answered in your main report (Deliverable 7), not in this notebook**
- There is no single "correct" set of methods — you must justify your choices
- In-notebook analyses are brief; deeper reflections go in your report

## Part 0: Understanding the Context

### Three Research Questions (RQs) Guiding This Assignment

Your DQ assessment will be contextualised by three hypothetical research questions:

**RQ1: Workflow Efficiency & Bottlenecks**  
*"What are the average turnaround times for different stages of the radiology workflow, and where do delays occur?"*
- **Task context**: Performance monitoring for department management
- **Human stakeholders**: Radiology managers, administrators

**RQ2: Quality of Care & Critical Finding Communication**  
*"How consistently are critical radiological findings communicated to ordering clinicians, and within what timeframes?"*
- **Task context**: Patient safety monitoring and regulatory compliance
- **Human stakeholders**: Radiologists, clinicians, patient safety officers

**RQ3: AI-Based Triage System Development**  
*"Can we develop an automated triage system to prioritise cases by urgency and complexity, routing them to appropriate radiologists?"*
- **Task context**: Clinical decision support and workflow optimisation
- **Human stakeholders**: Radiologists, AI developers, workflow coordinators

As you assess data quality, consider: **Which RQ(s) would be affected by the quality issues you find?**

## Part 1: Setup and Data Loading

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import re

# Schema requirements (metadata)
REQUIRED_COLUMNS = [
    'case_id', 'accession_number', 'event_time', 'activity',
    'fhir_resource', 'actor', 'role',
]

EXPECTED_ACTIVITIES = [
    'Order placed', 'Order accepted', 'Patient selected from worklist',
    'Image acquisition start', 'Image acquisition end', 'Images uploaded to PACS',
    'Case selected', 'Preliminary report created', 'Report validated and approved',
    'Report finalised', 'Report distributed', 'Critical communication',
]

LOG_FILE = 'radiology_event_log.csv'

In [None]:
def load_event_log(filepath: str) -> pd.DataFrame:
    """
    Load CSV and parse event_time as datetime.
    
    Hint: Use pd.read_csv() and pd.to_datetime()
    """
    df = pd.read_csv(filepath)
    df['event_time'] = pd.to_datetime(df['event_time'])
    return df

# Load the data
df = load_event_log(LOG_FILE)
print(f"Loaded {len(df)} events across {df['case_id'].nunique()} cases")
print(f"Time range: {df['event_time'].min()} to {df['event_time'].max()}")
df.head()

### 1.1 Initial Data Exploration

Before running formal DQ assessments, explore the data manually. Look at data types, columns, distributions, etc. You can also have a look at the .csv file. This corresponds to the **Data Facet** — understanding what data you have.

In [None]:
# TODO: Explore the data structure
# Write code to answer these questions or write your own exploratory questions and code:

# 1. What are the data types of each column?
# YOUR CODE HERE

# 2. What are the unique values in key columns: 'activity', 'role', 'fhir_resource', 'modality'?
# YOUR CODE HERE

# 3. Are there any obvious missing values (NaN, None)? Which columns?
# YOUR CODE HERE

# 4. How many events per case on average? What's the distribution?
# YOUR CODE HERE

# 5. What is the time span covered by this log?
# YOUR CODE HERE


**→ Reflection Question 1** (answer in your main report):  
What kind of data is logged in the file? Assuming data quality is good, how can the data be used to answer the RQs? 

## Part 2: The DQ Assessment Toolkit

Below are **pre-implemented** functions for assessing various DQ dimensions. 

**Your task**: 
1. Review all available methods
2. **Choose 4-5 methods** to run (you don't need to run all of them)
3. Consider which methods are most relevant to the RQs
4. Run the methods and analyse results
5. Justify your choices

### 2.1 Available DQ Assessment Methods

#### **Method 1: Schema Validation**
- **DQ Dimension**: Completeness (schema-level)
- **Description**: Checks if required columns exist, if event_time is valid, if activities are known

In [2]:
def method_1_schema_validation(df: pd.DataFrame) -> Dict[str, bool]:
    """
    Validate schema-level completeness.
    Checks against schema requirements (metadata)
    Reflects how the system enforces data structure
    """
    results = {
        'columns_present': all(col in df.columns for col in REQUIRED_COLUMNS),
        'datetime_valid': df['event_time'].notna().all(),
        'activities_known': df['activity'].isin(EXPECTED_ACTIVITIES).all()
    }
    return results

#### **Method 2: Completeness — Missing Values Analysis**
- **DQ Dimension**: Completeness (attribute-level)
- **Description**: Identifies missing values, including hidden placeholders

In [3]:
def method_2_completeness_missing_values(df: pd.DataFrame) -> Dict[str, any]:
    """
    Assess completeness by identifying missing and hidden missing values.
    Detects NaN, empty strings, and potential placeholders
    Finds missing values may indicate collection process issues
    """
    results = {
        'missing_counts': df.isnull().sum().to_dict(),
        'empty_strings': {col: (df[col] == '').sum() for col in df.select_dtypes(include='object').columns},
        'potential_placeholders': {}
    }
    
    # Check for common placeholder values
    for col in df.select_dtypes(include='object').columns:
        placeholders = df[col].isin(['N/A', 'n/a', 'NA', 'Unknown', 'UNKNOWN', '-', '--', '99', '-99']).sum()
        if placeholders > 0:
            results['potential_placeholders'][col] = placeholders
    
    return results

#### **Method 3: Accuracy — ICD-11 Code Validation**
- **DQ Dimension**: Accuracy (syntactic correctness)
- **Description**: Validates ICD-11 code format and placement

In [4]:
def method_3_accuracy_icd11_validation(df: pd.DataFrame) -> Dict[str, any]:
    """
    Validate ICD-11 codes for format and context accuracy.
    Checks code format against ICD-11 standard (reference data)
    Informs system's data entry validation
    """
    results = {
        'codes_in_wrong_activity': [],
        'invalid_format_codes': [],
        'free_text_in_code_field': []
    }
    
    if 'icd11_code' not in df.columns:
        return results
    
    # ICD-11 format: 2 uppercase letters + numbers + optional dot + numbers
    icd11_pattern = re.compile(r'^[A-Z]{2}[0-9]+(\.[0-9]+)?$')
    
    for idx, row in df.iterrows():
        code = row.get('icd11_code', '')
        if pd.isna(code) or code == '':
            continue
            
        # Check if code appears in wrong activity
        if code and row['activity'] != 'Report finalised':
            results['codes_in_wrong_activity'].append((row['case_id'], row['activity'], code))
        
        # Check format validity
        if code and not icd11_pattern.match(str(code)):
            # Distinguish between format errors and free text
            if len(str(code).split()) > 1 or not any(c.isdigit() for c in str(code)):
                results['free_text_in_code_field'].append((row['case_id'], code))
            else:
                results['invalid_format_codes'].append((row['case_id'], code))
    
    return results

#### **Method 4: Representativity — Actors, Roles, and Workforce Equity**
- **DQ Dimension**: Representativity
- **Description**: Assesses whether all expected actors/roles are represented and whether workload is distributed equitably

In [5]:
def method_4_representativity_actors_equity(df: pd.DataFrame) -> Dict[str, any]:
    """
    Assess representativity of actors and roles in the log, including equity analysis.
    
    Analyses:
    - Distribution of actor/role attributes 
    - Whether data represents expected workflow participants
    - Different human roles have different patterns
    - Workload distribution equity across staff
    - Gender balance if available
    - Role representation completeness
    """
    results = {
        'unique_actors': df['actor'].nunique(),
        'unique_roles': df['role'].nunique(),
        'actor_distribution': df['actor'].value_counts().to_dict(),
        'role_distribution': df['role'].value_counts().to_dict(),
        'activities_per_role': df.groupby('role')['activity'].unique().to_dict()
    }
    
    # Workload equity analysis
    cases_per_actor = df.groupby('actor')['case_id'].nunique()
    if len(cases_per_actor) > 0:
        results['workload_equity'] = {
            'mean_cases_per_actor': cases_per_actor.mean(),
            'std_cases_per_actor': cases_per_actor.std(),
            'min_cases': cases_per_actor.min(),
            'max_cases': cases_per_actor.max(),
            'gini_coefficient': None  # Could calculate if needed for detailed equity analysis
        }
        
        # Identify potential equity issues (extreme outliers)
        q1 = cases_per_actor.quantile(0.25)
        q3 = cases_per_actor.quantile(0.75)
        iqr = q3 - q1
        overloaded = cases_per_actor[cases_per_actor > q3 + 1.5 * iqr]
        underloaded = cases_per_actor[cases_per_actor < q1 - 1.5 * iqr]
        
        results['workload_outliers'] = {
            'overloaded_actors': overloaded.to_dict(),
            'underloaded_actors': underloaded.to_dict()
        }
    
    # Role completeness: Are all expected clinical roles present?
    expected_roles = ['Radiographer', 'Radiologist', 'Resident', 'Clerk']  # Adjust based on context
    present_roles = set(df['role'].dropna().unique())
    results['role_completeness'] = {
        'expected_roles': expected_roles,
        'present_roles': list(present_roles),
        'missing_roles': list(set(expected_roles) - present_roles)
    }
    
    return results

#### **Method 5: Representativity — Case Mix by Modality**
- **DQ Dimension**: Representativity
- **Description**: Assesses whether different imaging modalities are adequately represented

In [6]:
def method_5_representativity_modality(df: pd.DataFrame) -> Dict[str, any]:
    """
    Assess representativity of imaging modalities.
    Distribution of modality values
    Different modalities represent different case types/complexity and have different data requirements. E.g., mobile X-ray (DX) has to have a location note.
    """
    # Get modality from upload events
    upload_events = df[df['activity'] == 'Images uploaded to PACS'].copy()
    
    if 'modality' not in upload_events.columns:
        return {'error': 'Modality column not found'}
    
    results = {
        'total_cases': upload_events['case_id'].nunique(),
        'modality_distribution': upload_events['modality'].value_counts().to_dict(),
        'modality_percentages': (upload_events['modality'].value_counts(normalize=True) * 100).to_dict(),
        'missing_modality': upload_events['modality'].isna().sum()
    }
    return results

#### **Method 6: Representativity — Case Characteristics, Equipment, and Patient Demographics**
- **DQ Dimension**: Representativity
- **Description**: Assesses representativity across case urgency, anatomical regions, patient demographics, and equipment usage

In [7]:
def method_6_representativity_comprehensive(df: pd.DataFrame) -> Dict[str, any]:
    """
    Assess comprehensive representativity across multiple dimensions:
    - Case urgency levels (routine, urgent, stat)
    - Anatomical regions/body parts examined
    - Time of day / shift patterns
    - Equipment usage (if logged)
    - Patient demographics (age groups, if available)
    
    This informs:
    - Whether training data for AI (RQ3) represents all case types
    - Whether performance metrics (RQ1) cover all scenarios
    - Ethical considerations: are certain patient groups under-represented?
    """
    results = {}
    
    # Urgency/Priority analysis from notes or case attributes
    if 'notes' in df.columns:
        # Extract urgency markers
        df_temp = df.copy()
        df_temp['has_stat'] = df_temp['notes'].astype(str).str.contains('stat|STAT|urgent|URGENT', case=False, na=False)
        df_temp['has_routine'] = df_temp['notes'].astype(str).str.contains('routine|ROUTINE', case=False, na=False)
        
        urgency_cases = df_temp.groupby('case_id').agg({
            'has_stat': 'any',
            'has_routine': 'any'
        })
        
        results['urgency_distribution'] = {
            'stat_urgent_cases': urgency_cases['has_stat'].sum(),
            'routine_cases': urgency_cases['has_routine'].sum(),
            'unspecified': len(urgency_cases) - urgency_cases['has_stat'].sum() - urgency_cases['has_routine'].sum(),
            'total_cases': len(urgency_cases)
        }
    
    # Anatomical region analysis (if body_part column exists)
    if 'body_part' in df.columns:
        body_part_dist = df[df['activity'] == 'Images uploaded to PACS']['body_part'].value_counts()
        results['anatomical_distribution'] = body_part_dist.to_dict()
        results['anatomical_coverage'] = {
            'unique_body_parts': len(body_part_dist),
            'missing_body_part': df[df['activity'] == 'Images uploaded to PACS']['body_part'].isna().sum()
        }
    
    # Temporal representativity: Are all shifts/times of day represented?
    df_temp = df.copy()
    df_temp['hour'] = df_temp['event_time'].dt.hour
    df_temp['shift'] = pd.cut(df_temp['hour'], bins=[0, 8, 16, 24], labels=['Night', 'Day', 'Evening'], include_lowest=True)
    
    shift_cases = df_temp.groupby('case_id')['shift'].first().value_counts()
    results['temporal_distribution'] = {
        'cases_by_shift': shift_cases.to_dict(),
        'shift_balance': shift_cases.to_dict()
    }
    
    # Equipment diversity (if station_name or device_id available)
    if 'station_name' in df.columns:
        equipment_usage = df[df['activity'].str.contains('acquisition', case=False, na=False)]['station_name'].value_counts()
        results['equipment_usage'] = {
            'unique_stations': len(equipment_usage),
            'distribution': equipment_usage.to_dict(),
            'concentration': equipment_usage.max() / equipment_usage.sum() if len(equipment_usage) > 0 else 0
        }
    
    # Patient demographics (if available - age, gender)
    # Note: Often not in event logs due to privacy, but important for ethics analysis
    if 'patient_age' in df.columns:
        ages = df.groupby('case_id')['patient_age'].first().dropna()
        results['age_distribution'] = {
            'mean_age': ages.mean(),
            'age_groups': pd.cut(ages, bins=[0, 18, 45, 65, 100], labels=['Pediatric', 'Adult', 'Middle-aged', 'Elderly']).value_counts().to_dict()
        }
    
    # Ethical consideration: Data concentration

    results['ethical_notes'] = {    

        'comment': 'Check if certain patient groups, times, or case types are systematically under-represented. This affects fairness of AI systems and generalisability of performance metrics.'    
    }
    return results

#### **Method 7: Conformance Check — Temporal Sequence**
- **DQ Dimension**: Consistency (workflow conformance)
- **Description**: Verifies activities occur in expected order with plausible timing

In [8]:
def method_7_conformance_temporal_sequence(df: pd.DataFrame) -> pd.DataFrame:
    """
    Temporal sequence conformance.

    Full workflow temporal sequence with optional resident path:
        Order placed < Order accepted < Patient selected from worklist < Image acquisition start 
        < Image acquisition end < Images uploaded to PACS < Case selected
        < [Optional for resident cases: Preliminary report created < Report validated and approved]
        < Report finalised < Report distributed

    Timestamp ordering and values
    Expected workflow sequence (fitness for clinical process)
    Whether logging accurately captured event order
    
    This method specifically helps identify BATCH LOGGING issues:
    - Multiple events at identical timestamps
    - All events clustered within very short timeframe (e.g., < 1 minute for entire case)
    - Events recorded at suspicious times (e.g., all at 17:00 - end of shift)
    """
    required_sequence = [
        'Order placed', 'Order accepted', 'Patient selected from worklist',
        'Image acquisition start', 'Image acquisition end', 'Images uploaded to PACS',
        'Case selected', 'Report finalised', 'Report distributed'
    ]
    
    results = []
    
    for case_id, case_df in df.groupby('case_id'):
        case_df = case_df.sort_values('event_time')
        violations = []
        
        # Check all required activities exist
        missing = set(required_sequence) - set(case_df['activity'].values)
        if missing:
            violations.append(f"Missing activities: {', '.join(missing)}")
        
        # Check sequence order
        activity_times = {}
        for activity in required_sequence:
            rows = case_df[case_df['activity'] == activity]
            if len(rows) > 0:
                activity_times[activity] = rows['event_time'].iloc[0]
        
        for i in range(len(required_sequence) - 1):
            curr = required_sequence[i]
            next_act = required_sequence[i + 1]
            if curr in activity_times and next_act in activity_times:
                if activity_times[curr] >= activity_times[next_act]:
                    violations.append(f"VIOLATION: {curr} >= {next_act}")
        
        # BATCH LOGGING DETECTION: Check for duplicate timestamps
        duplicates = case_df[case_df.duplicated('event_time', keep=False)]
        if len(duplicates) > 0:
            unique_dup_times = duplicates['event_time'].nunique()
            violations.append(f"WARNING - BATCH LOGGING: {len(duplicates)} events share {unique_dup_times} timestamp(s)")
        
        # BATCH LOGGING DETECTION: Check for suspiciously short case duration
        if len(case_df) > 3:  # Only check if case has multiple events
            case_duration = (case_df['event_time'].max() - case_df['event_time'].min()).total_seconds() / 60
            if case_duration < 1:  # All events within 1 minute
                violations.append(f"WARNING - BATCH LOGGING: All {len(case_df)} events within {case_duration:.2f} minutes (likely batch entry)")
        
        # BATCH LOGGING DETECTION: Check for end-of-shift logging pattern
        event_hours = case_df['event_time'].dt.hour.unique()
        event_minutes = case_df['event_time'].dt.minute.unique()
        if len(event_hours) == 1 and event_hours[0] in [17, 18, 8, 9]:  # Shift change times
            if len(event_minutes) <= 2:  # All events within 2 minute values
                violations.append(f"WARNING - BATCH LOGGING: All events at {event_hours[0]:02d}:xx (end-of-shift pattern)")
        
        # Check for implausible gaps
        if 'Image acquisition end' in activity_times and 'Images uploaded to PACS' in activity_times:
            gap = (activity_times['Images uploaded to PACS'] - activity_times['Image acquisition end']).total_seconds() / 60
            if gap > 120:  # More than 2 hours
                violations.append(f"WARNING: Suspicious gap between acquisition and upload: {gap:.0f} minutes")
        
        results.append({
            'case_id': case_id,
            'c1_passed': len(violations) == 0,
            'c1_violations': '; '.join(violations) if violations else 'None'
        })
    
    return pd.DataFrame(results)

#### **Method 8: Conformance Check — Critical Finding Communication**
- **DQ Dimension**: Timeliness (task-specific)
- **Description**: Ensures critical findings are communicated within acceptable timeframe
- **Important**: When using this method, you should specify the `threshold_minutes` parameter and reflect on your choice. The default of 15 minutes may not be appropriate for all contexts.

In [9]:
def method_8_conformance_critical_communication(df: pd.DataFrame, threshold_minutes: int = 15) -> pd.DataFrame:
    """
    Critical findings should be communicated timely.
    
    Parameters:
    -----------
    df : pd.DataFrame
        Event log dataframe
    threshold_minutes : int, default=15
        Maximum acceptable time (in minutes) between report finalisation and critical communication.

        
    Facet considerations:
    - Clinical requirement for urgent communication 
    - Timestamp accuracy and 'notes' field parsing 
    - What humans consider "critical" and "timely"
    """
    results = []
    
    for case_id, case_df in df.groupby('case_id'):
        # Check if case is marked as critical
        is_critical = case_df['notes'].astype(str).str.contains('critical=true', case=False, na=False).any()
        
        if not is_critical:
            results.append({
                'case_id': case_id,
                'c4_passed': True,
                'c4_detail': 'Not a critical case'
            })
            continue
        
        final_row = case_df[case_df['activity'] == 'Report finalised']
        comm_row = case_df[case_df['activity'] == 'Critical communication']
        
        if len(final_row) == 0:
            results.append({
                'case_id': case_id,
                'c4_passed': False,
                'c4_detail': 'Critical case missing Report finalised'
            })
            continue
        
        if len(comm_row) == 0:
            results.append({
                'case_id': case_id,
                'c4_passed': False,
                'c4_detail': 'Critical case missing Critical communication event'
            })

            continue
        
        final_time = final_row['event_time'].iloc[0]
        comm_time = comm_row['event_time'].iloc[0]
        gap_minutes = (comm_time - final_time).total_seconds() / 60
        
        passed = 0 <= gap_minutes <= threshold_minutes
        results.append({
            'case_id': case_id,
            'c4_passed': passed,
            'c4_detail': f"Communication gap: {gap_minutes:.1f} minutes (threshold: {threshold_minutes})"
        })
    
    return pd.DataFrame(results)

#### **Method 9: Conformance Check — Modality-Specific Attributes**
- **DQ Dimension**: Accuracy (semantic correctness)
- **Description**: Validates modality-specific data requirements based on DICOM standards and clinical protocols

These requirements reflect clinical imaging protocols and PACS data quality standards.

**Modality Requirements Explained:**

- **DX (Digital Radiography - Portable/Mobile X-ray)**: Must include patient location (e.g., ward, ICU, emergency) because equipment moves to patient. Location is critical for logistics and dose tracking.
- **MR (if implemented)**: Should include field strength, sequence type, contrast timing
- **CR (Computed Radiography)**: Must have minimum series count (≥2 for AP/lateral views) and instance count (≥4 images) to ensure complete diagnostic study. Single-view imaging is typically inadequate for diagnosis.
- **CT (if implemented)**: Should include slice thickness, contrast usage, reconstruction algorithm

In [10]:
def method_9_conformance_modality_attributes(df: pd.DataFrame) -> pd.DataFrame:
    """
    Modality-specific attribute requirements.
    
    Validates that each imaging modality has its required metadata attributes:
    - DX (portable X-ray): location information required
    - CR (computed radiography): minimum series/instance counts for complete study
    
    Facet considerations:
    - Modality codes and required attributes (Data Facet)
    - PACS system should enforce these requirements (System Facet)
    - Different imaging tasks have different clinical data needs (Task Facet)
    """
    results = []
    
    for case_id, case_df in df.groupby('case_id'):
        upload_row = case_df[case_df['activity'] == 'Images uploaded to PACS']
        
        if len(upload_row) == 0:
            results.append({
                'case_id': case_id,
                'passed': False,
                'detail': 'No upload event found'
            })
            continue
        
        upload_row = upload_row.iloc[0]
        modality = upload_row.get('modality', '')
        violations = []
        
        # DX (portable) must have location - critical for mobile equipment tracking
        if modality == 'DX':
            notes = str(upload_row.get('notes', ''))
            if 'location' not in notes.lower():
                violations.append('DX portable missing location information (required for mobile equipment)')
        
        # CR must have sufficient series/instances for complete diagnostic study
        elif modality == 'CR':
            series = upload_row.get('series_count', 0)
            instances = upload_row.get('instance_count', 0)
            if pd.isna(series) or series < 2:
                violations.append(f'CR has insufficient series: {series} (expected >= 2 for AP/lateral views)')
            if pd.isna(instances) or instances < 4:
                violations.append(f'CR has insufficient instances: {instances} (expected >= 4 for complete study)')
        
        results.append({
            'case_id': case_id,
            'passed': len(violations) == 0,
            'detail': '; '.join(violations) if violations else 'All modality requirements met'
        })
    
    return pd.DataFrame(results)

#### **Method 10: Turnaround Time Calculation with Batch Logging Detection**
- **DQ Dimension**: Accuracy
- **Description**: Calculates key turnaround time metrics for workflow stages and flags cases with potential batch logging issues that may distort TAT calculations

In [11]:
def method_10_turnaround_times(df: pd.DataFrame) -> Dict[str, any]:
    """
    Calculate turnaround times for key workflow stages with SIMPLE batch logging detection.
    
    Turnaround times are critical performance metrics but are only
    meaningful if timestamps are accurate. This method flags cases
    where batch logging may have occurred, which would make TAT calculations unreliable.
    
    Returns both TAT statistics and data quality warnings about batch logging.
    """
    tat_results = []
    batch_logging_flags = []
    
    for case_id, case_df in df.groupby('case_id'):
        case_df = case_df.sort_values('event_time')
        times = {}
        
        for activity in ['Order placed', 'Image acquisition start', 'Images uploaded to PACS', 
                         'Case selected', 'Report finalised', 'Report distributed']:
            rows = case_df[case_df['activity'] == activity]
            if len(rows) > 0:
                times[activity] = rows['event_time'].iloc[0]
        
        # Calculate TATs
        tat = {'case_id': case_id}
        
        if 'Order placed' in times and 'Image acquisition start' in times:
            tat['order_to_acquisition_minutes'] = (times['Image acquisition start'] - times['Order placed']).total_seconds() / 60
        
        if 'Images uploaded to PACS' in times and 'Report finalised' in times:
            tat['upload_to_report_minutes'] = (times['Report finalised'] - times['Images uploaded to PACS']).total_seconds() / 60
        
        if 'Order placed' in times and 'Report distributed' in times:
            tat['total_tat_minutes'] = (times['Report distributed'] - times['Order placed']).total_seconds() / 60
        
        # BATCH LOGGING DETECTION for this case
        batch_warnings = []
        
        # Check 1: Multiple events at identical timestamps
        dup_times = case_df[case_df.duplicated('event_time', keep=False)]
        if len(dup_times) > 0:
            batch_warnings.append(f"Duplicate timestamps: {len(dup_times)} events")
        
        # Check 2: Unrealistically short total case duration
        if len(case_df) > 3:
            case_duration_minutes = (case_df['event_time'].max() - case_df['event_time'].min()).total_seconds() / 60
            if case_duration_minutes < 1:
                batch_warnings.append(f"All events within {case_duration_minutes:.2f} min")
            tat['case_duration_minutes'] = case_duration_minutes
        
        # Check 3: Suspiciously short TAT values (likely batch-logged)
        if 'order_to_acquisition_minutes' in tat and tat['order_to_acquisition_minutes'] < 5:
            batch_warnings.append(f"Unrealistic order-to-acquisition: {tat['order_to_acquisition_minutes']:.1f} min")
        
        tat['batch_logging_suspected'] = len(batch_warnings) > 0
        tat['batch_warnings'] = '; '.join(batch_warnings) if batch_warnings else 'None'
        
        tat_results.append(tat)
        
        if len(batch_warnings) > 0:
            batch_logging_flags.append(case_id)
    
    tat_df = pd.DataFrame(tat_results)
    
    # Filter out suspected batch-logged cases for cleaner statistics
    clean_tat_df = tat_df[~tat_df['batch_logging_suspected']] if 'batch_logging_suspected' in tat_df else tat_df
    
    summary = {
        'mean_order_to_acquisition': tat_df['order_to_acquisition_minutes'].mean() if 'order_to_acquisition_minutes' in tat_df else None,
        'mean_upload_to_report': tat_df['upload_to_report_minutes'].mean() if 'upload_to_report_minutes' in tat_df else None,
        'mean_total_tat': tat_df['total_tat_minutes'].mean() if 'total_tat_minutes' in tat_df else None,
        
        # Clean statistics (excluding suspected batch-logged cases)
        'mean_order_to_acquisition_clean': clean_tat_df['order_to_acquisition_minutes'].mean() if 'order_to_acquisition_minutes' in clean_tat_df and len(clean_tat_df) > 0 else None,
        'mean_upload_to_report_clean': clean_tat_df['upload_to_report_minutes'].mean() if 'upload_to_report_minutes' in clean_tat_df and len(clean_tat_df) > 0 else None,
        'mean_total_tat_clean': clean_tat_df['total_tat_minutes'].mean() if 'total_tat_minutes' in clean_tat_df and len(clean_tat_df) > 0 else None,
        
        # Batch logging statistics
        'total_cases': len(tat_df),
        'batch_logging_suspected_count': len(batch_logging_flags),
        'batch_logging_percentage': (len(batch_logging_flags) / len(tat_df) * 100) if len(tat_df) > 0 else 0,
        'batch_logged_cases': batch_logging_flags,
        
        'detailed_data': tat_df,
        'warning': 'TAT calculations may be unreliable for batch-logged cases. Consider using clean statistics or investigating batch-logged cases separately.'
    }
    
    return summary

#### **Method 11: Workload Distribution Analysis**
- **DQ Dimension**: Representativity & Consistency
- **Description**: Analyses how work is distributed across staff roles

In [12]:
def method_11_workload_distribution(df: pd.DataFrame) -> Dict[str, any]:
    """
    Analyse workload distribution across roles and actors.
    Requires accurate actor and role attribution
    Different roles represent different human workers
    Workload metrics are defined by organisational needs
    """
    results = {}
    
    # Cases per role
    cases_per_role = df.groupby('role')['case_id'].nunique().to_dict()
    results['cases_per_role'] = cases_per_role
    
    # Events per role
    events_per_role = df['role'].value_counts().to_dict()
    results['events_per_role'] = events_per_role
    
    # Cases per individual actor
    cases_per_actor = df.groupby('actor')['case_id'].nunique().sort_values(ascending=False).to_dict()
    results['cases_per_actor'] = cases_per_actor
    
    # Identify potential data quality issues
    actor_counts = df.groupby('actor')['case_id'].nunique()
    if len(actor_counts) > 0:
        mean_cases = actor_counts.mean()
        std_cases = actor_counts.std()
        outliers = actor_counts[(actor_counts > mean_cases + 2*std_cases) | (actor_counts < mean_cases - 2*std_cases)]
        results['workload_outliers'] = outliers.to_dict()
    
    return results

---
## Part 3: Method Selection and Justification

### Task : From the methods above (Methods 1-11), choose 4-5 methods to run. 

Complete this table in your main report.

| Method | DQ Dimension | Primary Facets | Relevant to RQ(s) | Why did you choose this? | Expected findings |
|--------|--------------|----------------|-------------------|--------------------------|-------------------|
| Example: Method 1 ||||||

Why did you NOT choose the other methods? What trade-offs did you consider? How did the three research questions (RQs) influence your method selection?

## Part 4: Running Your Chosen Methods

Now run each of your chosen methods. For each method:
1. Run the code
2. Display key results

**→ Reflection Question 3** (answer in your main report):  
Provide detailed interpretation of results from each chosen method:
- Include key results.
- Describe and interpret the results. Did they work as expected? Did they reveal any surprising insights? Could they be improved?
- Were your expectations correct?
- Which facet(s) does this result primarily inform about?


In [None]:
# TODO: Run your first chosen method
# Example:
# results_1 = method_1_schema_validation(df)
# print("=== Method 1: Schema Validation ===")
# for check, passed in results_1.items():
#     print(f"  {check}: {'✓ PASS' if passed else '✗ FAIL'}")

# YOUR CODE HERE

In [None]:
# TODO: Run your second chosen method
# YOUR CODE HERE

In [None]:
# TODO: Run your third chosen method
# YOUR CODE HERE

In [None]:
# TODO: Run your fourth chosen method
# YOUR CODE HERE

In [None]:
# TODO: Run your fifth chosen method (if applicable)
# YOUR CODE HERE

## Part 5: Manual Exploration — Propose Your Own DQ Dimension

Based on your exploration, identify a DQ concern NOT adequately covered by the provided methods.

**Task**:
1. Manually explore the data to identify a quality issue
2. Name and define a DQ dimension that captures this issue
3. Map it to the Five Facets framework
4. Write code to demonstrate the issue
5. Propose an assessment approach

In [None]:
# TODO: Explore the data to find an uncovered quality issue
# Suggestions:
# - Temporal patterns (e.g., clustered or unusually timed events)
# - Activity duration distributions (implausibly short/long durations)
# - Cross-case patterns (same error repeated across cases)
# - Consistency of actor names (typos, variations like "Dr. Smith" vs "Dr Smith")

### 5.1 Your Proposed DQ Dimension

**→ Reflection Question 4** (answer in your main report):  
Expand on your proposed DQ dimension. 
- Define your dimension.
- *What specific pattern or issue in the data motivated this dimension? Reference your exploration code above. Provide concrete examples.*
- *Which research question(s) would be affected by this quality issue? How?*
- Map the dimension relevancy in across the five facets

**Facet Mapping**:

| Facet | Involvement (++, +, -) | Justification |
|-------|------------------------|---------------|
| Data | | |
| Source | | |
| System | | |
| Task | | |
| Human | | |


In [None]:
# TODO: Write a function demonstrating assessment of your proposed dimension
# This doesn't need to be complete, but should show the core logic

def my_custom_dq_method(df: pd.DataFrame):
    """
    Assess [YOUR DIMENSION NAME].
    
    YOUR DESCRIPTION
    """
    results = {}
    
    # YOUR CODE HERE
    
    return results

# Run your method
# custom_results = my_custom_dq_method(df)
# print("=== My Custom DQ Assessment ===")
# Display results

 ## Part 6: Per-Facet Data Quality Synthesis

**→ Reflection Question 5** (answer in your main report):  
For each of the five data quality facets, interpret the results you got when running selected methods and assess the quality of the dataset. You can use these questions as guidance,
- What did your methods reveal about data quality at \[a facet\] level?
- What are the main data quality issues?
- What could be the consequences in regard to the RQs?

**Then discuss:**
a) Which facet was most challenging to assess with the available methods? Why?  
b) How else could these data quality facets be assessed?

## Part 7: Invisible Work — Comparing Log to Workflow Diagram

The assignment document includes a workflow diagram showing the actual radiology process with informal communication (orange dashed lines).

**Task**: Identify gaps between the event log and the workflow diagram.

In [None]:
# TODO: List all unique activities in your event log
print("=== Activities in Event Log ===")
# YOUR CODE HERE

# Compare these with activities/interactions shown in the workflow diagram
# Make notes on what's missing

### 7.1 Invisible Work Analysis

Complete the table with at least **3 activities** from the workflow diagram that do NOT appear in the log:

| Activity from Diagram | Why not logged? | Which facet explains this? | Consequences | How could it be captured? |
|----------------------|-----------------|---------------------------|--------------|---------------------------|
| | | | | |
| | | | | |
| | | | | |

---

**→ Reflection Question 6** (answer in your main report):  
a) For each activity, explain why it might not be logged (technical, social, economic reasons)
b) Reflect on how a process model built \emph{only} from the event log would fail to capture these activities. What are the potential consequences for system design, in regard to each of the research questions?
c) How does your Part I experience (interview vs. observation) parallel the gap between event log and workflow diagram?
d) What does the gap between logged data and actual practice tell us about building healthcare IT systems from ``data-driven'' approaches alone?
e) Propose one concrete change to the logging system that would capture currently invisible work. Draw on concepts from Chapter 9 (Formal and informal information systems) to discuss trade-offs: Why is complete logging difficult or even unwelcome in real-world settings? How could negative impacts be mitigated?