<a href="https://colab.research.google.com/github/djdunc/hercules/blob/main/data_processing/locations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Takes patient journey data and adds in machine location information based on a set of rectangles describing where machines are located.

In [1]:
import pandas as pd

def filter_patient_data(csv_file, patient_id):
    """
    Loads a CSV file into a pandas DataFrame and filters it by patient ID.

    Args:
        csv_file (str): Path to the CSV file.
        patient_id (str): The patient ID to filter by.

    Returns:
        pandas.DataFrame: A DataFrame containing only the data for the specified
                         patient ID, or an empty DataFrame if the file doesn't exist
                         or the patient ID is not found. Returns an error message if the file is not a valid CSV.
    """
    try:
        df = pd.read_csv(csv_file)
    except FileNotFoundError:
        return "Error: File not found."
    except pd.errors.ParserError:
        return "Error: Invalid CSV file format."
    except Exception as e: # Catch other potential pandas exceptions
        return f"An error occurred while reading the file: {e}"


    if 'Patient' not in df.columns:
        return "Error: 'Patient' column not found in CSV."

    filtered_df = df[df['Patient'] == patient_id]
    return filtered_df




In [2]:
def is_location_within_rectangle(x, y, rect):
    """Checks if (x, y) is within a given rectangle.

    Args:
        x: x-coordinate.
        y: y-coordinate.
        rect: A tuple or list defining the rectangle (x1, y1, x2, y2).

    Returns:
        bool: True if within, False otherwise.
    """
    x1, y1, x2, y2 = rect
    return x1 <= x <= x2 and y2 <= y <= y1  # Corrected y-coordinate comparison


In [4]:
rectangles = {
    "Reception": (0, 14.9, 8.38, 6.31),
    "WC": (24.95, 3.33, 27.67, 1.66),
    "MR 1": (25.95, 12.77,28.24, 9.30),
    "MR 2": (23.51, 12.77,25.80, 9.30),
    "MR 3": (17.28, 15.93,20.95, 13.63),
    "MR 4": (13.46, 15.96,17.13, 13.63),
    "AF LHS": (2.67, 6.08,4.76, 3.87),
    "AF RHS": (4.94, 6.06,7.03, 3.87),
    "HFA 1": (0.00, 3.96,2.30, 2.55),
    "HFA 2": (0.14, 2.32,1.66, 0.14),
    "HFA 3": (2.38, 2.41,3.87, 0.17),
    "HFA 4": (4.10, 2.41,5.60, 0.14),
    "HFA 5": (5.85, 2.41,7.32, 0.14),
    "HFA 6": (7.61, 2.41,9.10, 0.17),
    "Vision 1": (23.76, 7.43,25.57, 3.67),
    "Vision 2": (25.74, 7.40,27.52, 3.76),
    "Vision 3": (30.28, 7.38,34.01, 5.63),
    "Vision 4": (30.28, 9.24,34.01, 7.58),
    "Vision 5": (30.28, 11.08,34.04, 9.44),
    "Vision 6": (30.25, 13.00,34.01, 11.31),
    "Vision 7": (30.28, 14.92,34.01, 13.14),
    "MR Dilation 1": (29.79, 16.02,30.94, 15.04),
    "MR Dilation 2": (31.51, 16.02,32.63, 15.07),
    "MR Dilation 3": (33.09, 15.99,34.15, 15.10),
    "MR Dilation 4": (33.01, 17.77,34.10, 16.68),
    "MR Dilation 5": (31.05, 17.77,32.14, 16.68),
    "MR Dilation 6": (29.19, 17.77,30.28, 16.62),
    "MR Dilation 7": (27.52, 17.71,28.59, 16.68),
    "Research 1": (8.73, 5.74,12.43, 3.82),
    "Research 2": (8.67, 8.44,12.28, 6.63),
    "Research 3": (8.67, 10.45,12.31, 8.64),
    "Research 4": (8.67, 12.46,12.34, 10.62),
    "Research 5": (8.64, 14.49,12.34, 12.63),
    "Research 6": (12.74, 18.20,16.50, 16.36),
    "Glaucoma 1 ORA": (17.88, 6.40,21.58, 3.93),
    "Glaucoma 1 OCT": (15.64, 6.40,17.77, 3.93),
    "Glaucoma 1 WF": (13.60, 6.37,15.50, 3.93),
    "Glaucoma 2 ORA": (17.88, 9.04,21.61, 6.57),
    "Glaucoma 2 OCT": (15.64, 9.01,17.77, 6.57),
    "Glaucoma 2 WF": (13.63, 9.07,15.53, 6.60),
    "Glaucoma 3 ORA": (17.91, 11.62,21.61, 9.21),
    "Glaucoma 3 OCT": (15.70, 11.65,17.79, 9.24),
    "Glaucoma 3 WF": (13.63, 11.65,15.53, 9.21)
    }



In [22]:
def process_tasks(task_list_csv, rectangles, output_csv_path="combined_output.csv"):
    """
    Processes tasks from a task list CSV, appends results, and sorts by 'starttime'.

    Args:
        task_list_csv (str): Path to the CSV containing task information.
        rectangles: A dictionary where keys are rectangle names (strings) and
                    values are tuples/lists defining the rectangles (x1, y1, x2, y2).
        output_csv_path (str): The path to the output CSV file.
    """
    try:
        tasks_df = pd.read_csv(task_list_csv)
        #print("CSV Column Names:", tasks_df.columns.tolist())
    except FileNotFoundError:
        print(f"Error: Task list CSV not found at {task_list_csv}")
        return
    except Exception as e:
        print(f"Error reading task list CSV: {e}")
        return

    print(tasks_df)

    all_data = pd.DataFrame()  # Initialize an empty DataFrame

    for index, row in tasks_df.iterrows():
        file_path = row['csv_file_path']
        patient_id = row['patient_id_to_filter']
        unique_patient_id = row['Unique_Patient_ID'] # this is the one used by Rosica

        filtered_data = filter_patient_data(file_path, patient_id)

        if isinstance(filtered_data, pd.DataFrame):
            if not filtered_data.empty:
                try:
                    filtered_data[['xlocation', 'ylocation']] = filtered_data['Location'].apply(lambda loc: tuple(map(float, loc.replace('"', '').split(',')))).tolist()
                    filtered_data['starttime'] = pd.to_datetime(filtered_data['starttime'])

                except (ValueError, AttributeError):
                    print(f"Error processing {file_path}: Could not convert xlocation or ylocation to floats. Check data format.")
                    continue

                filtered_data['within_rectangle'] = ""

                for rect_name, rect_coords in rectangles.items():
                    filtered_data.loc[filtered_data.apply(lambda row: is_location_within_rectangle(row['xlocation'], row['ylocation'], rect_coords), axis=1), 'within_rectangle'] += rect_name + ","

                filtered_data['within_rectangle'] = filtered_data['within_rectangle'].str.rstrip(',')
                filtered_data['Unique_Patient_ID'] = unique_patient_id # Add Unique_Patient_ID


                all_data = pd.concat([all_data, filtered_data], ignore_index=True)

            else:
                print(f"No data found for Patient ID: {patient_id} in file: {file_path}")
        elif isinstance(filtered_data, str):
            print(f"Error processing {file_path}: {filtered_data}")
        else:
            print(f"An unexpected error occurred while processing {file_path}")

    if not all_data.empty:
        all_data = all_data.sort_values(by='starttime')
        all_data.to_csv(output_csv_path, index=False)
        print(f"Combined data saved to {output_csv_path}")
    else:
        print("No data was processed.")




In [None]:

task_list_csv = "ethno_patients.csv"
process_tasks(task_list_csv, rectangles, "combined_data.csv")



# Extra

process_data was used when processing single files

In [11]:
def process_data(csv_file_path, patient_id_to_filter, rectangles, output_csv_path):
    """Processes data, checking against multiple rectangles.

    Args:
        csv_file_path: Path to CSV.
        patient_id_to_filter: Patient ID.
        rectangles: A dictionary where keys are rectangle names (strings) and
                    values are tuples/lists defining the rectangles (x1, y1, x2, y2).
        output_file_path: Path to output text file.
    """

    filtered_data = filter_patient_data(csv_file_path, patient_id_to_filter)

    if isinstance(filtered_data, pd.DataFrame):
        if not filtered_data.empty:
            try:
                filtered_data[['xlocation', 'ylocation']] = filtered_data['Location'].apply(lambda loc: tuple(map(float, loc.replace('"', '').split(',')))).tolist()
                #Convert starttime to datetime
                filtered_data['starttime'] = pd.to_datetime(filtered_data['starttime'])

            except (ValueError, AttributeError):
                return "Error: Could not convert xlocation or ylocation to floats. Check data format."

            filtered_data['within_rectangle'] = ""  # Initialize the column

            for rect_name, rect_coords in rectangles.items():
                filtered_data.loc[filtered_data.apply(lambda row: is_location_within_rectangle(row['xlocation'], row['ylocation'], rect_coords), axis=1), 'within_rectangle'] += rect_name + "," # Append the rectangle name, comma separated.

            #Remove the trailing comma
            filtered_data['within_rectangle'] = filtered_data['within_rectangle'].str.rstrip(',')

            # Sort by starttime
            filtered_data = filtered_data.sort_values(by='starttime')
            print(filtered_data.to_string())
            # Save to CSV
            filtered_data.to_csv(output_csv_path, index=False)  # index=False prevents writing the index column



        else:
            print(f"No data found for Patient ID: {patient_id_to_filter}")
    elif isinstance(filtered_data, str):
        print(filtered_data)
    else:
        print("An unexpected error occurred.")



Extra bit of code to loop through all the data to see which records are not in any of the input files - takes a few mins to run

In [27]:
import os

def check_patient_id_files(task_list_csv, input_files, output_file="patient_id_file_mapping.txt"):
    """
    Checks if patient IDs from a task list CSV are present in specified input CSV files.

    Args:
        task_list_csv (str): Path to the task list CSV.
        input_files (list): List of paths to the input CSV files.
        output_file (str): Path to the output text file.
    """

    try:
        tasks_df = pd.read_csv(task_list_csv)
    except FileNotFoundError:
        print(f"Error: Task list CSV not found at {task_list_csv}")
        return
    except Exception as e:
        print(f"Error reading task list CSV: {e}")
        return

    results = [] # list to store results

    for index, row in tasks_df.iterrows():
        patient_id = row['patient_id_to_filter']
        found_in_files = []

        for input_file in input_files:
            try:
                df = pd.read_csv(input_file)
                if patient_id in df['Patient'].values: # Check if patient_id is present
                    found_in_files.append(os.path.basename(input_file)) # Append file name to the list
            except FileNotFoundError:
                print(f"Warning: Input file not found: {input_file}")
            except Exception as e:
                print(f"Warning: Error reading input file {input_file}: {e}")

        if found_in_files:
            results.append(f"Patient ID: {patient_id} found in: {', '.join(found_in_files)}")
        else:
            results.append(f"Patient ID: {patient_id} not found in any specified files.")

    # Write results to output file
    with open(output_file, "w") as f:
        for result in results:
            f.write(result + "\n")

    print(f"Patient ID to file mapping written to {output_file}")

# Example Usage:
task_list_csv = "ethno_patients.csv"  # Replace with your task list CSV path
input_files = ["P1_input.csv", "P2_input.csv", "P3_input.csv", "P4_input.csv"] # list of input files

check_patient_id_files(task_list_csv, input_files, "patient_file_mapping.txt") # specify output file

Patient ID to file mapping written to patient_file_mapping.txt
