# Graceful fallback
- Let's run a few tests and check how gracefully we fall back to Tumult when we encounter non-supported cases!

In [9]:
# %load_ext autoreload
# %autoreload 2

import warnings
warnings.filterwarnings(action='ignore', category=UserWarning)
warnings.filterwarnings(action='always', category=RuntimeWarning)
# warnings.simplefilter('always', RuntimeWarning)

from pyspark import SparkFiles
from pyspark.sql import SparkSession
from tmlt.analytics.keyset import KeySet
from tmlt.analytics.privacy_budget import PureDPBudget
from tmlt.analytics.protected_change import AddMaxRows
from tmlt.analytics.query_builder import QueryBuilder
from termcolor import colored

from turbo.core import Accuracy
from tmlt.turbo import TurboSession

def print_budget(prev_budget, remaining_privacy_budget):
    print(colored(f"Consumed Budget: {(prev_budget._epsilon-remaining_privacy_budget._epsilon).to_float(round_up=True)} \n", "red"))
    print(colored(f"Remaining Budget: {remaining_privacy_budget._epsilon.to_float(round_up=True)} \n", "green"))
    

# Read dataset
spark = SparkSession.builder.getOrCreate()
# spark.sparkContext.setLogLevel("OFF")
spark.sparkContext.addFile("/home/kelly/tumult/spark-warehouse/citibike.csv")
citibike_df = spark.read.csv(
    SparkFiles.get("citibike.csv"), header=True, inferSchema=True
)

# User needs to define a configuration for Turbo
turbo_config = {
    "alpha": 0.05,
    "beta": 0.001,
    "histogram_cfg": {"learning_rate": 4, "heuristic": "bin_visits:5-1", "tau": 0.01},
    "attributes_info": [
        (
            "weekday",
            [
                "Monday",
                "Tuesday",
                "Wednesday",
                "Thursday",
                "Friday",
                "Saturday",
                "Sunday",
            ],
        ),
        (
            "hour",
            [
                "00:00-4:00",
                "4:00-8:00",
                "8:00-12:00",
                "12:00-16:00",
                "16:00-20:00",
                "20:00-00:00",
            ],
        ),
        (
            "duration_minutes",
            ["0'-20'", "20'-40'", "40'-60'", "60'-80'", "80'-100'", "100'-120'"],
        ),
        ("start_station", ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]),
        ("end_station", ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]),
        ("usertype", ["customer", "subscriber"]),
        ("gender", ["unknown", "male", "female"]),
        ("age", ["0-17", "18-49", "50-64", "65+"]),
    ],
}

def evaluate(query, dp_demand):
    session = TurboSession.from_dataframe(
        privacy_budget=PureDPBudget(1),
        source_id="citibike",
        dataframe=citibike_df,
        protected_change=AddMaxRows(2),
        turbo_config=turbo_config,
    ) 
    prev_budget = session.remaining_privacy_budget
    result = session.evaluate(query, dp_demand)
    result.show()
    print_budget(prev_budget, session.remaining_privacy_budget)

23/09/24 17:27:03 WARN SparkContext: The path /home/kelly/tumult/spark-warehouse/citibike.csv has been added already. Overwriting of added paths is not supported in the current version.


### Passing `accuracy` target that doesn't match Turbo's accuracy


In [10]:
query = QueryBuilder("citibike").filter("gender = 'male'").count()
evaluate(query, Accuracy(0.06, 0.001))

  warn(
[Stage 130:>                                                        (0 + 1) / 1]

+------+
| count|
+------+
|224594|
+------+

[31mConsumed Budget: 0.0006965372843140597 
[0m
[32mRemaining Budget: 0.9993034627156869 
[0m



                                                                                

### Passing `privacy budget`  that doesn't match Turbo's accuracy


In [11]:
query = QueryBuilder("citibike").filter("gender = 'male'").count()
evaluate(query, PureDPBudget(epsilon=0.0000001))

  warn(

+---------+
|    count|
+---------+
|-10923208|
+---------+

[31mConsumed Budget: 1e-07 
[0m
[32mRemaining Budget: 0.9999999000000009 
[0m



                                                                                

### Not supporting Group-bys (for now)


In [12]:
genders = KeySet.from_dict({"gender": ["unknown", "male", "female"]})
query = QueryBuilder("citibike").groupby(genders).count()
evaluate(query, Accuracy(turbo_config["alpha"], turbo_config["beta"]))

  warn(
                                                                                

+-------+------+
| gender| count|
+-------+------+
| female| 70856|
|   male|225561|
|unknown| 28285|
+-------+------+

[31mConsumed Budget: 0.0008358447411768716 
[0m
[32mRemaining Budget: 0.9991641552588241 
[0m


### Not supporting binary ops other than `=`

In [13]:
query = QueryBuilder("citibike").filter("time > 0").count()
evaluate(query, Accuracy(turbo_config["alpha"], turbo_config["beta"]))

  warn(
[Stage 207:>                                                        (0 + 1) / 1]

+-----+
|count|
+-----+
|  818|
+-----+

[31mConsumed Budget: 0.0008358447411768716 
[0m
[32mRemaining Budget: 0.9991641552588241 
[0m



                                                                                

### Not supporting aggregations other than `counts`
For not counts, if we pass `accuracy` target the conversion we fail. We randomly pass a `privacy budget`.

In [14]:
query = QueryBuilder("citibike").sum("time", low=0, high=1)
evaluate(query, PureDPBudget(epsilon=0.001))

  warn(

+--------+
|time_sum|
+--------+
|    1376|
+--------+

[31mConsumed Budget: 0.001 
[0m
[32mRemaining Budget: 0.999000000000001 
[0m



                                                                                

### Passed Accuracy in evaluate but aggregation is not Count
`Accuracy` to `privacy budget` budget conversion cannot simply fall back to Tumult because Tumult does not support this. If this fails it fails completely.


In [6]:
query = QueryBuilder("citibike").sum("time", low=0, high=1)
evaluate(query, Accuracy(turbo_config["alpha"], turbo_config["beta"]))

                                                                                

ValueError: Can't request for accuracy target unless using Count.

### Try to create a TurboSession with a neighboring definition other than `AddMaxRows(2)`

In [16]:
session = TurboSession.from_dataframe(
    privacy_budget=PureDPBudget(1),
    source_id="citibike",
    dataframe=citibike_df,
    protected_change=AddMaxRows(1),
    turbo_config=turbo_config,
) 

ValueError: Turbo works only with the ReplaceOneRow definition which Tumult doesn't support.
                             You must use AddMaxRows(2) which entails ReplaceOneRow instead