# Example: Federated Join
Federated Join is a feature which allows SQL JOIN-like queries on data as if you had access to the data locally.

The **Centralized Result** is the result we would have gotten if we had the data in one location.  
The **Federated Result** is the result we get when we calculate the same metric in Federated fashion.

Federated Join can be performed in two modes:  
**Intersection Mode** Similar to a SQL LEFT INNER JOIN query  
**Union Mode**  Similar to a SQL FULL OUTER JOIN query  


### Load all necessary libraries, including 'rhino_health'

In [106]:
import os
import sys 
from getpass import getpass, getuser

import rhino_health

### Log in to the Rhino Health Platform

**Note: Replace "my_username" with your Rhino Health username.**

In [None]:
print("Logging In")
my_username = "my_email@example.com" # Replace this with the email you use to log into Rhino Health

my_password = getpass()
session = rhino_health.login(username=my_username, password=my_password, show_traceback=True)

print("Logged In")

# Intersection Mode

The data we are interested in lives at two different sites.  
Site 1 has Patient Identifier, Age, and Blood type. You can view the data in `./FederatedJoinFilterCohort.csv`  
Site 2 has Patient Identifer, Gender, SpO2, and Age values. You can view the data in `./FederatedJoinDataCohort.csv`  

*Note: Patient Age between Site 1 and Site 2 do not match to simulate data discrepancies.*

Intersection Mode allows us to perform a LEFT INNER JOIN between the two datasets. This join returns results for entries which intersect both datasets, as shown below.

<img src="left_join.png" width="200">

### Setup Sample Project
Create the project our example will go under

**Note: Replace "WORKGROUP_UID" with your Rhino Health Workgroup UID.**

In [None]:
WORKGROUP_UID = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"  # Replace this with your workgroup UID

from rhino_health.lib.endpoints.project.project_dataclass import ProjectCreateInput

new_project = ProjectCreateInput(
    name="Federated Join Metrics",
    description="Example Project for Federated Join",
    type="Validation",
    primary_workgroup_uid=WORKGROUP_UID,
)
project = session.project.add_project(new_project)

### Setup Sample Dataschema
Create the dataschema for our data

*Note: You can also use inferred data schemas if you do not have one provided*

In [None]:
from rhino_health.lib.endpoints.data_schema.data_schema_dataclass import DataschemaCreateInput

dataschema_input = DataschemaCreateInput(
    name="Federated Join Input Schema",
    description="Federated Join Input Schema",
    primary_workgroup_uid=WORKGROUP_UID,
    projects=[project.uid],
    file_path="./FederatedDataSchema.csv",
)
dataschema = session.data_schema.create_data_schema(dataschema_input)
data_schema_uid = dataschema.uid

### Import Cohorts
Import the cohorts to be used. You will need to upload the sample CSVs included in this tutorial to your on-prem agent.

**Note: Replace "DATA_LOCATION" with the location you uploaded the example cohort data.**

#### Import Filter Cohort

The **Filter Cohort** is represented by the right side B in the Join Diagram below. It is where our **Unique Identifiers** live. **Unique Identifiers** are deidentified identifiers that exist in both datasets. We perform filter operations on this cohort to be used later when fetching the data.

<img src="left_join.png" width="200">

In [None]:
DATA_LOCATION = "/rhino_data"  # Replace me with the folder path to where you uploaded the sample data

from rhino_health.lib.endpoints.cohort.cohort_dataclass import CohortCreateInput

filter_cohort_input = CohortCreateInput(
    name="Blood Test Results",
    description="Identifiers with Blood Type, No SPO2 or Gender",
    project_uid=project.uid,
    workgroup_uid=WORKGROUP_UID,
    data_schema_uid=data_schema_uid,
    csv_filesystem_location=f"{DATA_LOCATION}/FederatedJoinFilterCohort.csv",
    image_filesystem_location="",
    method="filesystem",
    is_data_deidentified=True,
    file_base_path="",
)
filter_cohort_at_site_1 = session.cohort.add_cohort(filter_cohort_input)

print(filter_cohort_at_site1.dict(include={'uid', 'import_status'}))

#### Import Data Cohort

The **Data Cohort** is represented by the left side A in the Join Diagram below. It is where the data we are interested in lives. Using the **Unique Identifiers** we found in the earlier Filter Cohort, we get results which match the metric specification.

<img src="left_join.png" width="200">

*Note: For this tutorial we are only using one on-prem agent. If you have multiple agents, replace WORKGROUP_UID with your second agent's Workgroup UID.*

In [None]:
first_data_cohort_input = CohortCreateInput(
    name="SpO2 Values on 1/1",
    description="Identifiers with SPO2 and Gender no Blood Type",
    project_uid=project.uid,
    workgroup_uid=WORKGROUP_UID,
    data_schema_uid=data_schema_uid,
    csv_filesystem_location=f"{DATA_LOCATION}/FederatedJoinDataCohort.csv",
    image_filesystem_location="",
    method="filesystem",
    is_data_deidentified=True,
    file_base_path="",
)
first_data_cohort_at_site2 = session.cohort.add_cohort(first_data_cohort_input)

print(first_data_cohort_at_site2.dict(include={'uid', 'import_status'}))

The followin

In [None]:
from rhino_health.lib.metrics import Count, Mean, StandardDeviation

### Federated Mean

Let's start with a simple example. We will get mean SpO2 data for patients greater than 35 years old. In our initial query, we will trust Site 1's Age data over what is in Site 2.

In [None]:
print("SpO2 values in data cohort for patients > 35 years old")
configuration = Mean(
    variable="SpO2",
    join_field={"data_column": "UID", "filter_column": "Age", "filter_value": 35, "filter_type": ">"},
)

federated_results = session.project.joined_cohort_metric(
    filter_cohorts=[filter_cohort_at_site1.uid],
    data_cohorts=[first_data_cohort_at_site2.uid],
    configuration=configuration
)

print(f"Federated Mean {federated_results.output}")

We can compare **Federated Results** to the **Centralized Results** and verify that we get the same response.

In [None]:
import pandas as pd
import numpy as np
filter_df = pd.read_csv("./FederatedJoinFilterCohort.csv")
first_data_df = pd.read_csv("./FederatedJoinDataCohort.csv")

centralized_result = np.mean(first_data_df.SpO2[filter_df["Age"] > 35])
print(f"Centralized Mean: {centralized_result}")
federated_result = federated_results.output['mean']
print(f"Results Match: {centralized_result == federated_result}")

We can also perform filters on the results. Here we can calculate the mean for male patients (data stored in Site 2)

In [None]:
print("SpO2 values in data cohort for male patents > 35 years old")

configuration = Mean(
    variable={"data_column": "SpO2", "filter_column": "Gender", "filter_value": "m", "filter_type": "="},
    join_field={"data_column": "UID", "filter_column": "Age", "filter_value": 35, "filter_type": ">"},
)

federated_results = session.project.joined_cohort_metric(
    filter_cohorts=[filter_cohort_at_site1.uid],
    data_cohorts=[first_data_cohort_at_site2.uid],
    configuration=configuration
)

federated_result = federated_results.output['mean']
print(f"Federated Mean: {federated_result}")

a = filter_df["Age"] > 35
b = first_data_df["Gender"] == "m"
centralized_result = np.mean(first_data_df.SpO2[a&b])
print(f"Centralized Mean: {centralized_result}")
print(f"Results Match: {centralized_result == federated_result}")

Data can also be grouped by fields found in the Data Cohort (Site 2).  
For example we can get results for both genders without specifying a specific gender.

*Note: The field to group by must exist in the Data Cohort*

In [None]:
print("Works with grouping. SpO2 values for both genders >35 years old")

configuration = Mean(
    variable="SpO2",
    join_field={"data_column": "UID", "filter_column": "Age", "filter_value": 35, "filter_type": ">"},
    group_by={"groupings": ["Gender"]},
)

federated_results = session.project.joined_cohort_metric(
    filter_cohorts=[filter_cohort_at_site1.uid],
    data_cohorts=[first_data_cohort_at_site2.uid],
    configuration=configuration
)

print(f"Federated Results by Gender: {federated_results.output}")

c = first_data_df["Gender"] == "f"
female_centralized_result = np.mean(first_data_df.SpO2[a&c])
print(f"New Results Match: {female_centralized_result == federated_results.output['f']['mean']}")

We can add additional filters as desired. For example, we can filter on Age in both datasets instead of only one. We can also add in filtering on Blood Type.

In [None]:
print("Filtering on multiple data columns")

configuration = Mean(
    variable="SpO2",
    join_field="UID",
    data_filters=[
        {
            "filter_column": "Age",
            "filter_value": 35,
            "filter_type": ">",
            # For intersection mode, if unspecified apples to both cohorts. 
        },
        {
            "filter_column": "BloodType",
            "filter_value": "a",
            "filter_type": "=",
            "filter_cohort": filter_cohort_at_site1.uid,
        },
        {
            "filter_column": "Gender",
            "filter_value": "m",
            "filter_type": "=",
            "filter_cohort": first_data_cohort_at_site2.uid,
        }
    ],
)

federated_results = session.project.joined_cohort_metric(
    filter_cohorts=[filter_cohort_at_site1.uid],
    data_cohorts=[first_data_cohort_at_site2.uid],
    configuration=configuration
)
print(f"Federated Multi-Filter Mean: {federated_results.output}")
federated_result = federated_results.output['mean']

a = filter_df["Age"] > 35
b = filter_df["BloodType"] == "a"
valid_uids = first_data_df.UID.isin(filter_df[a&b].UID)
c = first_data_df["Age"] > 35
d = first_data_df["Gender"] == "m"
centralized_result = np.mean(first_data_df.SpO2[valid_uids&c&d])
print(f"Centralized Multi-Filter Mean: {centralized_result}")
print(f"Results Match: {centralized_result == federated_result}")

# Union Mode

In this example use case, we have three datasets for the same metric.

> - Current Latest SpO2 from Site 2 (From Intersection Example)
> - Backup dataset with old SpO2 values. See `FederatedJoinUnionCohort1.csv`
> - Separate initial diagnostics labratory where patients might have transferred from. See `FederatedJoinUnionCohort2.csv`

Union Mode allows us to perform a FULL OUTER JOIN between the multiple datasets above. We can prefer data from our own dataset first as the most reliable to query for patient data first. If the patient is not found we fall back to the backup copy and finally the separate diagnostic lab. The results will be deduplicated which prevents double counting patients who may have seen multiple providers.

<img src="full_outer_join.png" width="200">

### Import Cohorts
You will need to upload the sample CSVs included in this tutorial to your on-prem agent just like before.

*Note: In Union Mode, all cohorts are data cohorts.*

In [None]:
second_data_cohort_input = CohortCreateInput(
    name="SpO2 Values (Old)",
    description="Identifiers with SPO2 and Gender no Blood Type",
    project_uid=project.uid,
    workgroup_uid=WORKGROUP_UID,
    data_schema_uid=data_schema_uid,
    csv_filesystem_location=f"{DATA_LOCATION}/FederatedJoinUnionCohort1.csv",
    image_filesystem_location="",
    method="filesystem",
    is_data_deidentified=True,
    file_base_path="",
)
second_data_cohort_old_values = session.cohort.add_cohort(second_data_cohort_input)

third_data_cohort_input = CohortCreateInput(
    name="SpO2 Values at Diagnostics Lab",
    description="Identifiers with SPO2 and Gender no Blood Type",
    project_uid=project.uid,
    workgroup_uid=WORKGROUP_UID,
    data_schema_uid=data_schema_uid,
    csv_filesystem_location=f"{DATA_LOCATION}/FederatedJoinUnionCohort2.csv",
    image_filesystem_location="",
    method="filesystem",
    is_data_deidentified=True,
    file_base_path="",
)
third_data_cohort_at_lab = session.cohort.add_cohort(third_data_cohort_input)


union_cohort_uids = [first_data_cohort_at_site2.uid, second_data_cohort_old_values.uid, third_data_cohort_at_lab.uid]

print(union_cohort_uids)

Load the new cohorts for comparison between **Federated Results** and **Centralized Results**

In [None]:
second_data_df = pd.read_csv("./FederatedJoinUnionCohort1.csv")
third_data_df = pd.read_csv("./FederatedJoinUnionCohort2.csv")

### Query Metric
Once our data is ready, we can perform queries on it, using data_filters in any combination that is desired.

*Note: In Union Mode, all cohorts are data cohorts.*

In [None]:
print("Union mean using data filters")

configuration = Mean(
    variable="SpO2",
    join_field="UID",
    join_mode="union",
    data_filters=[
        {
            "filter_column": "Age",
            "filter_value": 35,
            "filter_type": ">",
            # Applied to every Union cohort
        },
        {
            "filter_column": "Gender",
            "filter_value": "m",
            "filter_type": "=",
            "filter_cohort": second_data_cohort_old_values.uid,
        },
        {
            "filter_column": "Gender",
            "filter_value": "f",
            "filter_type": "=",
            "filter_cohort": third_data_cohort_at_lab.uid,
        },
    ],
)

federated_results = session.project.joined_cohort_metric(
    data_cohorts=union_cohort_uids,
    configuration=configuration
)

print(f"Federated Union Mean: {federated_results.output}")

federated_result = federated_results.output['mean']

one = first_data_df[(first_data_df.Age > 35)]
two = second_data_df[(second_data_df.Age > 35) & (second_data_df.Gender == "m") & (~second_data_df.UID.isin(one.UID))]
three = third_data_df[(third_data_df.Age > 35) & (third_data_df.Gender == "f") & (~third_data_df.UID.isin(two.UID)) & (~third_data_df.UID.isin(one.UID))]

centralized_result = np.mean(pd.concat([one.SpO2, two.SpO2, three.SpO2]))

print(f"Centralized Union Mean: {centralized_result}")
print(f"Results Match: {centralized_result == federated_result}")
            

We can also use shorthand FilterVariables instead of data_filters.

In [None]:
print("Union mean using FilterVariable Shorthand")

from rhino_health.lib.metrics.base_metric import JoinMode

configuration = Mean(
    variable={"data_column": "SpO2", "filter_column": "Gender", "filter_value": "m", "filter_type": "="},
    join_field={"data_column": "UID", "filter_column": "Age", "filter_value": 35, "filter_type": ">"},
    join_mode=JoinMode.UNION
)

federated_results = session.project.joined_cohort_metric(
    data_cohorts=union_cohort_uids,
    configuration=configuration
)

print(f"Federated Union Mean: {federated_results.output}")

federated_result = federated_results.output['mean']

one = first_data_df[(first_data_df.Age > 35) & (first_data_df.Gender == "m")]
two = second_data_df[(second_data_df.Age > 35) & (second_data_df.Gender == "m") & (~second_data_df.UID.isin(one.UID))]
three = third_data_df[(third_data_df.Age > 35) & (third_data_df.Gender == "m") & (~third_data_df.UID.isin(two.UID)) & (~third_data_df.UID.isin(one.UID))]

centralized_result = np.mean(pd.concat([one.SpO2, two.SpO2, three.SpO2]))
print(f"Centralized Union Mean: {centralized_result}")

print(f"Results Match: {centralized_result == federated_result}")
