In [1]:
!pip install opendp==0.14.1
!pip install polars==1.32.0



In [2]:
import numpy as np
import polars as pl
import matplotlib.pyplot as plt
import opendp.prelude as dp
import polars as pl

dp.enable_features("contrib")

In [None]:
dummy_lf = pl.scan_csv('example_pipeline_new.csv', try_parse_dates=True)
# random context
def create_context():

    custom_context =  dp.Context.compositor(
        data=pl.scan_csv('example_pipeline_new.csv', try_parse_dates=True),
        privacy_unit = dp.unit_of(contributions=1),
        privacy_loss=dp.loss_of(rho=10.0, delta=1e-6),
        split_evenly_over=1,
        margins=[
            dp.polars.Margin(
                max_length=1000,
                invariant="lengths"
            ),
            dp.polars.Margin(
                by=["patient_id"],
            ),
        ]
    )

    return custom_context

# Pipeline combination

In [4]:
dummy_lf.collect()

patient_id,case_id,year,hospital,region,date_of_admission,age,sex,diagnostic,treatment,treatment_date
i64,i64,i64,f64,str,date,i64,i64,str,str,str
5,56,2022,22.0,"""AUy""",2009-04-07,23,2,"""4WfjG""",,
29,42,2022,59.0,"""qJO""",2010-10-30,8,2,"""4wh""",,
85,69,2005,25.0,"""nex""",2020-12-12,64,1,"""94Sw""",,
70,91,2003,44.0,"""unR""",2006-10-27,2,2,"""GkT""",,
81,33,2005,23.0,"""Ywj""",2010-03-10,18,1,"""dv""",,
…,…,…,…,…,…,…,…,…,…,…
29,99,2003,94.0,"""Pqf""",2018-12-27,72,1,,"""ciHC1""","""2014-10-06"""
2,32,2015,42.0,"""qJO""",2012-03-14,86,2,,"""ETb""","""2023-09-03"""
90,48,2020,12.0,"""bg""",2023-02-15,99,1,,,"""2013-02-17"""
74,23,2020,12.0,"""GCS""",2009-05-03,54,1,,"""18nL""","""2018-07-13"""


## With Polars

In [5]:
# Get patient with diagnostic C18
plan_1 = (
    dummy_lf.filter(pl.col("diagnostic")=="C18")
    .group_by(["patient_id"])
    .agg([
        pl.len()
    ])
)

# Get patient that had treatment xx
plan_2 = (
    dummy_lf.filter(pl.col("treatment")=="xx")
    .group_by(["patient_id"])
    .agg([
        pl.len()
    ])
)

# join both pipeline
plan_join = (
    plan_1.join(plan_2, on="patient_id")
    .select(pl.len())
)

# collect result
plan_join.collect().item()

15

## With OpenDP

In [6]:
context1 = create_context()
plan_dp_1 = (
    context1.query().filter(pl.col("diagnostic")=="C18")
    .group_by(["patient_id"])
    .agg([
        pl.len()
    ])
)

rho should be less than or equal to 0.5, and is typically less than or equal to 0.25


In [7]:
context2 = create_context()
plan_dp_2 = (
    context2.query().filter(pl.col("treatment")=="xx")
    .group_by(["patient_id"])
    .agg([
        pl.len()
    ])
    .join(plan_dp_1, on="patient_id")
    .select(pl.col("patient_id")).dp.len()
)

rho should be less than or equal to 0.5, and is typically less than or equal to 0.25


TypeError: expected `other` to be a 'LazyFrame', not 'opendp.extras.polars._wrap'

# with_columns (``.any()`` / ``.over()``)

## Polars

In [8]:
plan = (
    dummy_lf
    # Add per-patient flags indicating whether they ever had diagnostic C18 and treatment xx
    .with_columns(has_c18 = (pl.col("diagnostic") == "C18").any().over("patient_id"))
    .with_columns(has_xx = ((pl.col("treatment")=="xx").any().over("patient_id")))
    # Filter patients with both conditions
    .filter(pl.col("has_c18") & pl.col("has_xx"))
    # Count total patient (unique) with both cases
    .select(pl.col("patient_id").n_unique())

)

plan.collect()

patient_id
u32
15


## OpenDP

In [9]:
# For now, this is not regognised in opendp
# ==> we get an error
context = create_context()
plan_dp = (
    context.query()
    # Add per-patient flags indicating whether they ever had diagnostic C18 and treatment xx
    .with_columns(has_c18 = (pl.col("diagnostic") == "C18").any().over("patient_id"))
    .with_columns(has_xx = ((pl.col("treatment")=="xx").any().over("patient_id")))
    # Filter patients with both conditions
    .filter(pl.col("has_c18") & pl.col("has_xx"))
    # Count total patient (unique) with both cases
    .select(pl.col("patient_id").dp.n_unique())

)
plan_dp.release().collect()

rho should be less than or equal to 0.5, and is typically less than or equal to 0.25


OpenDPException: 
  MakeTransformation("Expr is not recognized at this time: [(col("diagnostic")) == ("C18")].any().over([col("patient_id")]). If you would like to see this supported, please file an issue.")

# with_columns (``.min()``)

## With Polars

In [10]:
# Polars: Filter patients that recieved treatment over 15 years
plan_diff = (
    dummy_lf
    .with_columns(age_min = (pl.col("age")).min().over("patient_id"))
    .with_columns(age_max = ((pl.col("age")).max().over("patient_id")))
    .with_columns(age_diff  = (pl.col("age_max")) - (pl.col("age_min")))
    .filter(pl.col("age_diff")>15)
    .select(pl.col("patient_id").n_unique())

)

plan_diff.collect()

patient_id
u32
22


## With OpenDP

In [11]:
# min/max does not seem to be working with openDP neither (with columns)

context = create_context()
plan_diff_dp = (
    context.query()
    .with_columns(age_min = (pl.col("age")).min().over("patient_id"))
    .with_columns(age_max = ((pl.col("age")).max().over("patient_id")))
    .with_columns(age_diff  = (pl.col("age_max")) - (pl.col("age_min")))
    .filter(pl.col("age_diff")>15)
    .select(pl.col("patient_id").dp.n_unique())

)

plan_diff_dp.release().collect()

rho should be less than or equal to 0.5, and is typically less than or equal to 0.25


OpenDPException: 
  MakeTransformation("Expr is not recognized at this time: col("age").min().over([col("patient_id")]). If you would like to see this supported, please file an issue.")