In [17]:
%%capture
pip install 'opendp[polars]'

# Explore opendp==0.14
https://docs.opendp.org/en/stable/getting-started/quickstart.html

In [1]:
import opendp.prelude as dp
import polars as pl

dp.enable_features("contrib")

In [2]:
PATH = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/penguins.csv"
PATH = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
PATH = dp.examples.get_france_lfs_path()

In [3]:
df = pl.scan_csv(PATH, ignore_errors=True)

In [4]:
df.collect().head()

PIDENT,YEAR,QUARTER,SEX,AGE,HWUSUAL,HWACTUAL,ILOSTAT
i64,i64,i64,i64,f64,f64,f64,i64
1,2013,3,2,7.0,99.0,99.0,9
1,2013,2,2,7.0,99.0,99.0,9
1,2013,1,2,7.0,99.0,99.0,9
2,2005,4,2,20.0,99.0,99.0,3
2,2005,3,2,20.0,99.0,99.0,3


# ESSENTIAL

## COUNT

In [5]:
context = dp.Context.compositor(
    data=df,
    privacy_unit=dp.unit_of(contributions=5),
    privacy_loss=dp.loss_of(epsilon=1.0),
    split_evenly_over=5,
)

In [6]:
## Count
query_num_responses = context.query().select(dp.len())
query_num_responses

In [7]:
print("count:", query_num_responses.release().collect().item())  

count: 3811934


## SUM

In [8]:
context = dp.Context.compositor(
    data=df,
    privacy_unit=dp.unit_of(contributions=36),
    privacy_loss=dp.loss_of(epsilon=1.0),
    split_evenly_over=5,
    margins=[
        dp.polars.Margin(
            max_length=150_000* 36
        ),
    ],
)

In [9]:
query_work_hours = (
    context.query().filter(pl.col("HWUSUAL") != 99.0)
    .select(
        pl.col.HWUSUAL.cast(int)
        .fill_null(35)
        .dp.sum(bounds=(0, 80))
    )
)

In [10]:
query_work_hours.summarize(alpha=0.05)

column,aggregate,distribution,scale,accuracy
str,str,str,f64,f64
"""HWUSUAL""","""Sum""","""Integer Laplace""",14400.0,43139.04473


In [11]:
print("HWUSUAL:", query_work_hours.release().collect().item())

HWUSUAL: 56295484


## MEAN

In [12]:
query_work_hours = (
    context.query().filter(pl.col.HWUSUAL != 99.0)
    .select(
        pl.col.HWUSUAL.cast(int).dp.sum(bounds=(0, 80)),
        dp.len(),
    )
)

In [13]:
query_work_hours.release().collect().with_columns(
    mean=pl.col.HWUSUAL / pl.col.len
) 

HWUSUAL,len,mean
i64,u32,f64
56341083,1495878,37.664223


In [14]:
context_bounded_dp = dp.Context.compositor(
    data=df,
    privacy_unit=dp.unit_of(contributions=36),
    privacy_loss=dp.loss_of(epsilon=1.0),
    split_evenly_over=5,
    margins=[
        dp.polars.Margin(
            max_length=150_000 * 36,            
            invariant="lengths", # don't protect the total number of records (bounded-DP)
        ),
    ],
)

In [15]:
query_mean_work_hours = context_bounded_dp.query().select(
    pl.col.HWUSUAL.cast(int).dp.mean(bounds=(0, 80))
)

In [16]:
print("mean:", query_mean_work_hours.release().collect().item(),)

mean: 63.153345337744746


## MEDIAN

In [17]:
candidates = list(range(20, 60))

query_median_hours = (
    context.query()
    .filter(pl.col.HWUSUAL != 99.0)
    .select(
        pl.col.HWUSUAL.cast(int).dp.median(candidates)
    )
)

In [18]:
print("median:", query_median_hours.release().collect())  

median: shape: (1, 1)
┌─────────┐
│ HWUSUAL │
│ ---     │
│ i64     │
╞═════════╡
│ 37      │
└─────────┘


## QUANTILES

In [19]:
query_multi_quantiles = (
    context.query()
    .filter(pl.col.HWUSUAL != 99.0)
    .select(
        pl.col.HWUSUAL.cast(int)
        .dp.quantile(a, candidates)
        .alias(f"{a}-Quantile")
        for a in [0.25, 0.5, 0.75]
    )
)

In [20]:
query_multi_quantiles.release().collect()  

0.25-Quantile,0.5-Quantile,0.75-Quantile
i64,i64,i64
35,37,40


# GROUPING

## STABLE KEYS (spend delta - only show groups big enough)

In [27]:
context = dp.Context.compositor(
    data=df,
    privacy_unit=dp.unit_of(contributions=36),
    privacy_loss=dp.loss_of(epsilon=1.0 / 4, delta=1e-7),
    split_evenly_over=1,
)

In [28]:
query_age_ilostat = (
    context.query()
    .group_by("AGE", "ILOSTAT")
    .agg(dp.len())
)

In [29]:
tmp_df = query_age_ilostat.release().collect()
tmp_df.head(2)

AGE,ILOSTAT,len
f64,i64,u32
20.0,3,301424
65.0,3,618015
32.0,3,85027
7.0,9,689013
47.0,2,45084
…,…,…
65.0,2,13196
75.0,3,349294
47.0,1,652683
20.0,1,133260


## EXPLICIT KEYS (does not spend delta)

In [30]:
context = dp.Context.compositor(
    data=df,
    privacy_unit=dp.unit_of(contributions=36),
    privacy_loss=dp.loss_of(epsilon=1.0 / 4),
    split_evenly_over=1,
)

In [31]:
query_age_ilostat = (
    context.query()
    .group_by("AGE", "ILOSTAT")
    .agg(dp.len())
    .with_keys(tmp_df["AGE", "ILOSTAT"])
)

In [32]:
query_age_ilostat.release().collect().head(2)

AGE,ILOSTAT,len
f64,i64,u32
47.0,2,45306
20.0,3,301091
32.0,1,520176
47.0,3,97220
47.0,1,653072
…,…,…
32.0,3,84802
32.0,2,57091
20.0,2,40180
65.0,1,209512


## INVARIANT GROUP KEYS

In [38]:
context = dp.Context.compositor(
    data=df,
    privacy_unit=dp.unit_of(contributions=36),
    privacy_loss=dp.loss_of(epsilon=1.0 / 4),
    split_evenly_over=1,
    margins=[
        dp.polars.Margin(by=["YEAR", "QUARTER"], invariant="keys") # group keys when grouped by "YEAR" and "QUARTER" are invariant
    ],
)

In [39]:
query_quarterly_counts = (
    context.query()
    .group_by("YEAR", "QUARTER")
    .agg(dp.len())
)

In [40]:
query_quarterly_counts.release().collect().head(2)

YEAR,QUARTER,len
i64,i64,u32
2008,2,87628
2009,3,104913


## INVARIANT GROUP LENGTHS

In [41]:
# filtering the data within the query results in the margin info being invalidated. One way to work around this limitation is to preprocess your data before passing it into the context
lf_preprocessed = df.filter(pl.col("HWUSUAL") < 99)

In [42]:
context = dp.Context.compositor(
    data=lf_preprocessed,
    privacy_unit=dp.unit_of(contributions=36),
    privacy_loss=dp.loss_of(epsilon=1.0, delta=1e-7),
    split_evenly_over=1,
    margins=[
        # total number of responses when grouped by "SEX" is public information
        dp.polars.Margin(
            by=["SEX"],
            invariant="lengths",
            max_length=150_000 * 36,
            max_groups=3, # encoding is M, F or null
        )
    ],
)

In [43]:
query_work_hours = (
    context.query()
    .group_by("SEX")
    .agg(pl.col.HWUSUAL.cast(int).dp.mean((0, 98))) # no budget for length as assumed known public info
)

In [44]:
query_work_hours.release().collect().head(2)

SEX,HWUSUAL
i64,f64
1,40.782522
2,34.223871


# IDENTIFIER TRUNCATION

In [62]:
privacy_unit = dp.unit_of(
    contributions=1, identifier=pl.col("PIDENT")
)

In [63]:
context = dp.Context.compositor(
    data=df,
    privacy_unit=privacy_unit,
    privacy_loss=dp.loss_of(epsilon=1.0, delta=1e-8),
    split_evenly_over=4,
    margins=[dp.polars.Margin(max_length=150_000 * 36)],
)

## TRUNCATE PER GROUP CONTRIBUTION

In [52]:
query = (
    context.query()
    .filter(pl.col.HWUSUAL != 99)
    .truncate_per_group(10) # == .filter(pl.int_range(pl.len()).over("PIDENT") < 10)
    .select(pl.col.HWUSUAL.cast(int).dp.mean((0, 80)))
)

In [53]:
query.release().collect()

HWUSUAL
f64
37.63341


## TRUNCATE CONTRIBUTED GROUPS

To release queries that involve identifier columns and grouping, 
- it is also necessary to bound the number of groups an individual may contribute to,
- and not just the number of contributions per-group.

In [64]:
quarterly = [pl.col.QUARTER, pl.col.YEAR]
query = (
    context.query()
    .filter(pl.col.HWUSUAL != 99)
    .truncate_per_group(1, by=quarterly)   # == .filter(pl.int_range(pl.len()).over("PIDENT", *quarterly) < 1)
    .truncate_num_groups(10, by=quarterly) # == .filter(pl.struct(*quarterly).rank("dense").over("PIDENT") < 10)
    .group_by(quarterly)
    .agg(
        dp.len(),
        pl.col.HWUSUAL.cast(int).dp.sum((0, 80)),
    )
)

In [65]:
query.summarize()

column,aggregate,distribution,scale,threshold
str,str,str,f64,u32
"""len""","""Frame Length""","""Integer Laplace""",80.0,1714.0
"""HWUSUAL""","""Sum""","""Integer Laplace""",6400.0,


In [66]:
query.release().collect()

OpenDPException: 
  FailedFunction("Duplicate(ErrString("multiple fields with name 'QUARTER' found\n\nResolved plan until failure:\n\n\t---> FAILED HERE RESOLVING 'sink' <---\nFILTER [(col(\"QUARTER\").as_struct([col(\"YEAR\")]).hash().as_struct([col(\"QUARTER\").as_struct([col(\"YEAR\")])]).rank().over([col(\"PIDENT\")])) < (dyn int: 10)]\nFROM\n  FILTER [(0.int_range([len().cast(Int64)]).shuffle().over([col(\"PIDENT\"), col(\"QUARTER\"), col(\"YEAR\")])) < (1)]\n  FROM\n    FILTER [(col(\"HWUSUAL\")) != (99.0)]\n    FROM\n      Csv SCAN [/opt/python/lib/python3.13/site-packages/opendp/extras/examples/v2_france_lfs.csv]\n      PROJECT */8 COLUMNS"))")

# MICRODATA

## WITH COLUMNS
Expressions passed into .with_columns must be row-by-row, meaning that the expression could be represented as a function applied to each row in the data.

Any new columns added by .with_columns do not (currently) have margin descriptors. For instance, in the above query, any margin descriptors related to HWUSUAL would no longer apply to the new, shadowing, HWUSUAL column after .with_columns.

In [80]:
context = dp.Context.compositor(
    data=df,
    privacy_unit=dp.unit_of(contributions=36),
    privacy_loss=dp.loss_of(epsilon=2.0, delta=1e-6),
    split_evenly_over=10,
    margins=[dp.polars.Margin(max_length=150_000 * 36)],
)

In [81]:
query_hwusual_binned = (
    context.query()
    # shadows the usual work hours "HWUSUAL" column with binned data
    .with_columns(
        pl.col.HWUSUAL.cut(
            breaks=[0, 20, 40, 60, 80, 98],
            left_closed=True,
        )
    )
    .group_by(pl.col.HWUSUAL)
    .agg(dp.len())
)

In [82]:
query_hwusual_binned.release().collect().sort("HWUSUAL").head(2)

HWUSUAL,len
cat,u32
,20988
"""[0, 20)""",89061


## SELECT
resolves each passed expression to a column and then returns those columns

## FILTER
.filter uses row-by-row expressions of booleans to mask rows.

Filtering discards all invariants about the group keys and group sizes. Margin descriptors are considered applicable for the input dataset, so a data-dependent filtering renders these invariants invalid.

Otherwise, filtering preserves all other margin descriptors, because filtering only ever removes rows.

In [83]:
query_total_hours_worked = (
    context.query()
    .with_columns(pl.col.HWUSUAL.cast(int))
    .filter(pl.col.HWUSUAL != 99)
    .select(pl.col.HWUSUAL.dp.sum((0, 80)))
)

In [84]:
print("sum:", query_total_hours_worked.release().collect().item())  

sum: 56343318


## GROUP BY (PRIVATE)
.group_by also resolves each passed expression to a column, and then groups on those columns. must be row-by-row.

In [85]:
query_hwusual_binned = (
    context.query()
    .group_by(
        pl.col.HWUSUAL.cut(
            [0, 20, 40, 60, 80, 98], left_closed=True
        )
    )
    .agg(dp.len())
)

In [86]:
query_hwusual_binned.release().collect().sort("HWUSUAL")  

HWUSUAL,len
cat,u32
,21183
"""[0, 20)""",89284
"""[20, 40)""",908273
"""[40, 60)""",407000
"""[60, 80)""",78344
"""[80, 98)""",13278
"""[98, inf)""",2295225


## GROUP BY (STABLE)
group_by/agg can also be used earlier in the data pipeline, before the private group_by/agg or select aggregation --> multi group by ? 

appealing because arbitrary expressions can be used in the agg argument, 

but a large amount of data is needed to get reasonable utility.

In [88]:
query_hwusual_binned = (
    context.query()
    .filter(pl.col.HWUSUAL != 99)
    # group 1000 ways
    .group_by(pl.col.PIDENT % 1000)
    .agg(pl.col.HWUSUAL.min())
    # up to 1000 records left to work with to compute a DP mean
    .select(pl.col.HWUSUAL.cast(int).dp.mean((0, 30)))
)

In [89]:
query_hwusual_binned.release().collect()

HWUSUAL
f64
-2.026034


Inform context of number of user in group depending on groupby

In [90]:
context_pident = dp.Context.compositor(
    data=pl.scan_csv(
        dp.examples.get_france_lfs_path(),
        ignore_errors=True,
    ),
    privacy_unit=dp.unit_of(
        contributions=[
            dp.polars.Bound(per_group=36),
            # a user can only be in one group at a time when grouped this way
            dp.polars.Bound(
                by=[pl.col.PIDENT % 1000], num_groups=1
            ),
        ]
    ),
    privacy_loss=dp.loss_of(epsilon=1.0, delta=1e-7),
    split_evenly_over=4,
    margins=[dp.polars.Margin(max_length=150_000 * 36)],
)

In [91]:
query_hwusual_binned = (
    context_pident.query()
    .filter(pl.col.HWUSUAL != 99)
    # group 1000 ways
    .group_by(pl.col.PIDENT % 1000)
    .agg(pl.col.HWUSUAL.min())
    # up to 1000 records left to work with to compute a DP mean
    .select(pl.col.HWUSUAL.cast(int).dp.mean((0, 30)))
)

In [92]:
query_hwusual_binned.release().collect()

HWUSUAL
f64
0.177914
