# Sync subject and session IDs on Flywheel with local dataframe
Niall Bourke  
14-11-24   

Sometimes the exact IDs may vary between sources due to typos, or variation in use of '_' or '-' or ' '

Where possible these should be aligned to assist with future analysis. The purpose of the custom-information-sync gear is to pull a list of subject_ids and session_ids for a project EXACTLY as they are on FW. Taking this list you can sync another source of data ensuring they have the IDs expected on FW. This way when the new data is uploaded it will make sure it goes to the correct matching subject & session. 

**pre-requisit**
Run the 'custom-information-sync' gear

**Usage**
Go through the following steps:
- Look for a recent run of custom-information-sync  
- Download output of this gear to the local Jupyter environment   
- Upload dataframe to sync to Jupyter environment by selecting upload button  
- Run ID syncing cell (this formats the new variables to have the expected subject & session IDs)  
- QC the updated file  
- Rerun 'custom-information-sync' with this new file as input to sync the info to the matching sessions  

In [None]:
! pip install fuzzywuzzy
! pip install python-Levenshtein

In [None]:
from pathlib import Path
import os
import pandas as pd
from datetime import datetime
# from fuzzywuzzy import process

home=os.getcwd()
print(os.getcwd())
data_path = Path('Data')

### Step 2: Selecting and Downloading the Latest Gear Run Output in Flywheel

This script retrieves the output of the latest (or second latest) run of a specified Flywheel "gear" (processing tool), making it available for local analysis within a Jupyter environment. Here’s an overview of the process:

1. **Set Project and Analysis Context**: The code starts by defining the project and retrieving all analyses associated with it.

2. **Define Gear Name**: The specific gear name (in this case, `custom-information-sync`) is set to filter analyses. This ensures that only runs using this gear are considered.

3. **Filter Analyses by Gear Name**: The script filters through all analyses to find those that match the specified gear name. This filtering is done in a case-insensitive manner.

4. **Retrieve Latest Gear Runs**: From the filtered list, it identifies the two most recent gear runs:
   - **Latest Gear Run**: The most recent analysis run that matches the specified gear.
   - **Second Latest Gear Run**: The second most recent run, used as a backup if the latest run is missing the required files.

5. **Check for Files in the Latest Gear Run**: The script checks if the latest run has an output file:
   - If the file is found, it proceeds with the latest run.
   - If no file is found, it defaults to the second most recent run and logs a message.

6. **Download the File Locally**: Once the appropriate file object is identified, it downloads the file to a specified directory within the Jupyter environment, ready for further processing.

This script ensures that the most recent data from the specified gear run is readily accessible, with a fallback to a previous run if necessary.


In [None]:
project = fw_project
analyses = project.analyses

# Specify the gear name to search for
gear='custom-information-sync'
gear_to_find = gear.strip()  # Assuming 'gear' is the gear name you're looking for

# Filter the analyses by the gear name
filtered_gear_runs = [
    run for run in analyses
    if run.get('gear_info', {}).get('name', '').strip().casefold() == gear_to_find.casefold()
]

# Get the most recent and second most recent gear runs
latest_gear_run = filtered_gear_runs[-1]
second_latest_gear_run = filtered_gear_runs[-2]

# Attempt to get the file object from the latest gear run
file_object = latest_gear_run.files

# Check if the file_object is None or an empty list
if not file_object:  # This will be True if file_object is None or an empty list
    # If no file is found in the latest gear run, use the second most recent run
    file_object = second_latest_gear_run.files
    print("Using second most recent run. File object: ", file_object[0].name)
else:
    print("Using latest run. File object: ", file_object[0].name)

# Pulling file into Jupyter env
download_path = Path(home) / "Data/" / file_object[0].name
file_object[0].download(download_path)
file_str = file_object[0].name


### Step 3: Syncing Local Data with Flywheel Session Information

This script combines local demographic or variable data with session data from a Flywheel project, aligning information for seamless synchronization. Here’s an overview of its key steps:

1. **Define File Paths**: The script defines file paths for accessing the local data (e.g., a demographics CSV file) and Flywheel session information CSV.

2. **Import Data**: It loads the local data (`df_local`) and Flywheel session data (`df_fw`) into separate dataframes. These dataframes contain information that will later be merged, such as participant demographics and session details.

3. **Align Identifiers**: The script renames columns in `df_local` to match those in Flywheel (`df_fw`). This alignment ensures that subject IDs and session IDs are consistent, enabling the data to be matched accurately.

4. **Merge Dataframes**: The `merge()` function combines `df_fw` and `df_local` based on shared identifiers (e.g., `subject_id`), allowing demographic data to be added to the Flywheel session data.

5. **Replace Data Where Needed**: For each relevant column, the script replaces Flywheel values with those from the local data if they are available. This allows updated or additional information from the local dataset to overwrite existing values in Flywheel data.

6. **Clean Up Columns**: After merging, the script removes unnecessary columns (suffixes created by merging, like `_fw` and `_local`) to keep the dataframe clean and organized.

7. **Save the Merged Dataframe**: Finally, the script saves the merged and cleaned dataframe back to a CSV file, ready for uploading or further analysis.

This process enables efficient synchronization of locally stored data with Flywheel, ensuring that session information is accurate and up-to-date.


In [None]:

# Define file path for local data and Flywheel session information
f = Path(Path.home() / 'Data' / 'NEWDATA.csv')

# Import local dataframe with demographics or variables you want to sync to Flywheel project
df_local = pd.read_csv(f)

# Import session information dataframe from Flywheel
df_fw = pd.read_csv(Path(Path.home() / "Data" / file_object[0].name))

# NOTE: Align subject IDs between dataframes. If the IDs don't align, Flywheel will not be able to assign the variables to the correct sessions
df_local = df_local.rename(columns={"studyid": "subject_id", 
                                    "age_months_at_scan": "age_months"})

# Merge the dataframes on subject_id and session_id
merged_df = df_fw.merge(df_local, on=["subject_id"], how="left", suffixes=('_fw', '_local'))

# Replace values in df_fw with those from df_local for matching columns
columns_to_replace = [col for col in df_local.columns if col not in ["subject_id", "session_id"]]

for column in columns_to_replace:
    # If the local data has a value, use it; otherwise, keep the original df_fw value
    merged_df[column] = merged_df[column + "_local"].combine_first(merged_df[column + "_fw"])

# Drop the unnecessary '_fw' and '_local' columns created by the merge
merged_df.drop(columns=[col + "_fw" for col in columns_to_replace] + [col + "_local" for col in columns_to_replace], inplace=True)

merged_df = merged_df.rename(columns={'session_id_fw': 'session_id'})
merged_df = merged_df.drop(columns=['session_id_local'])

# Write the merged dataframe back to CSV
merged_df.to_csv(file_str, index=False, encoding="utf-8")


# Step 4: Uploading a File and Running a Gear Analysis in Flywheel

This final script cell uploads a processed file to Flywheel and initiates a new analysis job using a specified gear. Here’s an overview of the steps:

1. **Upload the File to Flywheel**: The script uploads the prepared file (`file_str`) to the Flywheel project. It stores the uploaded file as `input_file`, which serves as an input reference for the gear analysis.

2. **Select the Gear**: The `custom-information-sync` gear is retrieved from Flywheel, which will process the uploaded file.

3. **Prepare Job Inputs**: The script sets up an `inputs` dictionary where `session_info` is linked to the uploaded file. This dictionary will pass the file as input to the gear.

4. **Define Destination and Label for Analysis**: The analysis destination is set to the project level. The script generates a unique `analysis_label` using the current date and time, ensuring each run is clearly labeled.

5. **Run the Gear Analysis Job**:
   - The script calls the gear’s `run` function, providing the label, inputs, destination, and an empty configuration.
   - It stores the job ID in `job_list` and logs a message indicating that the job has been submitted.

6. **Handle Errors**: If an error occurs during job submission, the script catches the exception and prints a warning with the specific error message.

This code cell completes the workflow by initiating an analysis job in Flywheel, using the uploaded file as input, and tagging it for easy tracking.


In [None]:
# Get the Flywheel File object using the file path
project = fw_project
upload_result = project.upload_file(file_str)  # Upload the file to Flywheel and get the file object
input_file = upload_result[0]

gear =  fw.lookup('gears/custom-information-sync')
analysis_tag = 'UPLOAD'
job_list = list()     

# Initialize the 'inputs' dictionary
inputs = {}

# Add the file reference to the inputs dictionary
inputs['session_info'] = input_file  # The value is now the file reference object

    
try:
    # The destination for this analysis will be on the session
    dest = project
    analysis_label = f'{analysis_tag}_{formatted_timestamp()}'
    job_id = gear.run(
        analysis_label=analysis_label,
        inputs=inputs,
        destination=dest,
        tags=[''],
        config={}
    )
    job_list.append(job_id)
    print("Submitting Job: Check Jobs Log", dest.label)
except Exception as e:
    print(f"WARNING: Job cannot be sent for {dest.label}. Error: {e}")

### Advanced level: refactored as a function

In [None]:

def upload_and_run_gear(project, file_path, gear_name, analysis_tag):
    # Upload file
    upload_result = project.upload_file(file_path)
    input_file = upload_result[0] if upload_result else None
    
    if not input_file:
        print("Failed to upload file.")
        return

    # Set up gear and run job
    gear = fw.lookup(f'gears/{gear_name}')
    inputs = {'session_info': input_file}
    analysis_label = f'{analysis_tag}_{formatted_timestamp()}'
    
    try:
        job_id = gear.run(analysis_label=analysis_label, inputs=inputs, destination=project, tags=[''])
        print("Job submitted:", job_id)
    except Exception as e:
        print(f"WARNING: Job could not be sent. Error: {e}")

        
upload_and_run_gear(fw_project, file_str, 'custom-information-sync', 'UPLOAD')


---

## BETA ##

### Making life complicated by derfining functions
- The benifit is to recycle helper fuctions
- In theory makes it easier to maintain

In [None]:

def formatted_timestamp():
    return datetime.now().strftime('%d-%m-%Y_%H-%M-%S')


def get_latest_gear_run(analyses, gear_name):
    """Returns the latest and second latest runs of the specified gear."""
    filtered_runs = [
        run for run in analyses
        if run.get('gear_info', {}).get('name', '').strip().casefold() == gear_name.casefold()
    ]
    if len(filtered_runs) >= 2:
        return filtered_runs[-1], filtered_runs[-2]
    elif filtered_runs:
        return filtered_runs[-1], None
    return None, 



project = fw_project
analyses = project.analyses
latest_gear_run, second_latest_gear_run = get_latest_gear_run(analyses, 'custom-information-sync')
print(second_latest_gear_run.files)


def download_latest_file(run, backup_run, download_dir):
    # Check if `run` exists and has files, otherwise fallback to `backup_run`
    file_object = run.files if run and run.files else (backup_run.files if backup_run and backup_run.files else None)
    
    if file_object:
        download_path = download_dir / file_object[0].name
        file_object[0].download(download_path)
        print(f"Using {'latest' if run and run.files else 'second latest'} run. File: {file_object[0].name}")
        return download_path
    
    print("No file found in specified runs.")
    print("Submitting a job to pull subject/session list")
    
    gear =  fw.lookup('gears/custom-information-sync')
    job_list = list()     
    try:
        # The destination for this analysis will be on the session
        dest = project
        analysis_label = f'custom-information-sync_{formatted_timestamp()}'
        job_id = gear.run(
            analysis_label=analysis_label,
            inputs=[],
            destination=dest,
            tags=[''],
            config={}
        )
        job_list.append(job_id)
        print("Submitting Job: Check Jobs Log", dest.label)
    except Exception as e:
        print(f"WARNING: Job cannot be sent for {dest.label}. Error: {e}")
        return None


download_path = Path(home) / "Data/" 

download_latest_file(latest_gear_run, second_latest_gear_run, download_path)