In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

22/10/16 18:08:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from tmlt.core.transformations.spark_transformations.filter import Filter
from tmlt.core.transformations.spark_transformations.agg import Count

from tmlt.core.measurements.noise_mechanisms import AddGeometricNoise

from tmlt.core.domains.spark_domains import convert_spark_schema, SparkDataFrameDomain

from tmlt.core.metrics import SymmetricDifference

from tmlt.core.utils.misc import print_sdf

In [4]:
df = spark.read.csv('simulated_hospital_data.csv', header=True, inferSchema=True)

In [50]:
import pandas as pd 
import numpy as np

In [None]:
nd = NumpyDomain()

In [5]:
from pyspark.sql.types import *
spark_schema = StructType(
    [StructField("Age", IntegerType())]
)
tumult_schema = SparkDataFrameDomain(convert_spark_schema(spark_schema))

In [6]:
over_18_measurement = (
    Filter(filter_expr="Age >= 18", domain=tumult_schema, metric=SymmetricDifference())
    | Count(input_domain=tumult_schema, input_metric=SymmetricDifference())
    | AddGeometricNoise(2)
)
print("Noisy count of records with age >= 18:")
print(over_18_measurement(df))
print("Privacy loss (epsilon):")
print(over_18_measurement.privacy_function(1))


Noisy count of records with age >= 18:
97
Privacy loss (epsilon):
1/2


# Break it down! 

In [7]:
filter = Filter(filter_expr="Age >= 18", domain=tumult_schema, metric=SymmetricDifference())
count = Count(input_domain=tumult_schema, input_metric=SymmetricDifference())


In [8]:
print(filter.input_domain)
print(filter.output_domain)
print(filter.input_metric)
print(filter.output_metric)

SparkDataFrameDomain(schema={'Age': SparkIntegerColumnDescriptor(allow_null=True, size=32)})
SparkDataFrameDomain(schema={'Age': SparkIntegerColumnDescriptor(allow_null=True, size=32)})
SymmetricDifference()
SymmetricDifference()


In [9]:
print(count.input_domain)
print(count.output_domain)
print(count.input_metric)
print(count.output_metric)

SparkDataFrameDomain(schema={'Age': SparkIntegerColumnDescriptor(allow_null=True, size=32)})
NumpyIntegerDomain(size=64)
SymmetricDifference()
AbsoluteDifference()


In [11]:
filter_and_count = filter | count


In [12]:
print(filter_and_count.input_domain)
print(filter_and_count.output_domain)
print(filter_and_count.input_metric)
print(filter_and_count.output_metric)

SparkDataFrameDomain(schema={'Age': SparkIntegerColumnDescriptor(allow_null=True, size=32)})
NumpyIntegerDomain(size=64)
SymmetricDifference()
AbsoluteDifference()


In [13]:
print(filter_and_count.stability_relation(1,1))
# print(filter_transformation.stability_function(1))

True


In [14]:
print(filter_and_count(df))


100


In [15]:
add_noise = AddGeometricNoise(2)

In [16]:
print(add_noise.input_domain)
print(add_noise.input_metric)
print(add_noise.output_measure)

NumpyIntegerDomain(size=64)
AbsoluteDifference()
PureDP()


In [17]:
over_18_measurement = filter_and_count | add_noise


In [18]:
print(over_18_measurement(df))


101


In [19]:
print(over_18_measurement.privacy_function(1))

1/2


The privacy guarantee says, informally, that if you call this function on similar dataframes, you will get statistically similar noisy counts. The privacy_function quantifies this guarantee precisely. By calling this function with an input of 1, we learn how statistically similar the outputs will be for two dataframes that differ by 1 row. The function return value tells us that the noisy counts satisfy -differential privacy with .

If we call this function with an input of 2 (dataframes differing by 2 rows), we learn how statistically similar the outputs will be for two dataframes that differ by 2 rows. That is, we learn that the group privacy guarantee: the mechanism satisfies -differential privacy for groups of size 2, with .

In [20]:
# group privacy 
print(over_18_measurement.privacy_function(4))

2


In [21]:
from tmlt.core.measurements.noise_mechanisms import AddGeometricNoise, AddLaplaceNoise, AddDiscreteGaussianNoise

In [22]:
from tmlt.core.domains.numpy_domains import NumpyIntegerDomain

In [23]:
gn = AddGeometricNoise(2) # aplha: noise scale # din/alpha
ln = AddLaplaceNoise(NumpyIntegerDomain(32), 2) # din/b
gaun = AddDiscreteGaussianNoise(16) # sigma_squared: noise scale # din^2/(2*sigma^2)

print(gn, ln)

<tmlt.core.measurements.noise_mechanisms.AddGeometricNoise object at 0x12b4ab7c0> <tmlt.core.measurements.noise_mechanisms.AddLaplaceNoise object at 0x12b4ab3a0>


In [24]:
from tmlt.core.utils.parameters import calculate_noise_scale
from tmlt.core.measures import PureDP, ApproxDP, RhoZCDP

# utils.parameters

In [25]:
calculate_noise_scale(4, 8, RhoZCDP())
# d_in: The absolute distance between neighboring inputs. 
# d_out: The desired output measure value. 
# output_measure: RhoZCDP, PureDP the desired privacy gaurantee

1

# converters

In [26]:
from tmlt.core.measurements.converters import PureDPToRhoZCDP, PureDPToApproxDP, RhoZCDPToApproxDP

In [27]:
rhopdp = PureDPToRhoZCDP(gn)
rhopdp.privacy_function(2) # smallest d_out satisified by the measurement p = (epsilon^2)/2
rhopdp.privacy_relation(1, 2)

True

In [28]:
pdp = PureDPToApproxDP(gn)
pdp.privacy_function(2) # smallest d_out satisified by the measurement p = (epsilon^2)/2
pdp.privacy_relation(1, (2, 1))

True

In [29]:
pdp = RhoZCDPToApproxDP(rhopdp)
pdp.privacy_relation(1, (2, 1)) # smallest d_out satisified by the measurement p = (epsilon^2)/2
# pdp.privacy_relation(1, (2, 1))

True

# [interactive measurements](https://docs.tmlt.dev/core/latest/reference/tmlt/core/measurements/interactive_measurements/index.html#module-tmlt.core.measurements.interactive_measurements) 

# transformations 

In [30]:
from tmlt.core.transformations.chaining import ChainTT

In [31]:
tt = ChainTT(filter, count)

In [32]:
tt.transformation1
tt.transformation2

<tmlt.core.transformations.spark_transformations.agg.Count at 0x10fee54f0>

In [33]:
print(tt.input_domain)
print(tt.input_metric) 
print(tt.output_domain) 
print(tt.output_metric)

SparkDataFrameDomain(schema={'Age': SparkIntegerColumnDescriptor(allow_null=True, size=32)})
SymmetricDifference()
NumpyIntegerDomain(size=64)
AbsoluteDifference()


## converters

In [34]:
# to test
from tmlt.core.transformations.converters import UnwrapIfGroupedBy, HammingDistanceToSymmetricDifference

## dictionary

In [35]:
from tmlt.core.transformations.dictionary import create_copy_and_transform_value, create_rename, \
create_apply_dict_of_transformations, create_transform_value, create_transform_all_values

In [36]:
from tmlt.core.transformations.dictionary import CreateDictFromValue, AugmentDictTransformation, \
Subset, GetValue

## identitiy 

In [37]:
from tmlt.core.transformations.identity import Identity 

## [spark_transformations](https://docs.tmlt.dev/core/latest/reference/tmlt/core/transformations/spark_transformations/index.html#module-tmlt.core.transformations.spark_transformations)

In [39]:
# for manipu;atong spark dataframes 
from tmlt.core.transformations.spark_transformations import agg, groupby, join, partition, \
persist, rename, select, add_remove_keys, filter, id, nan, truncation 

# 

# [metrics](https://docs.tmlt.dev/core/latest/reference/tmlt/core/metrics/index.html#module-tmlt.core.metrics)

In [44]:
import numpy as np
from tmlt.core.metrics import Metric, \
                              NullMetric, \
                              ExactNumberMetric, \
                              AbsoluteDifference, \
                              SymmetricDifference, \
                              HammingDistance, \
                              AggregationMetric, \
                              SumOf, \
                              RootSumOfSquared, \
                              OnColumn, \
                              OnColumns, \
                              IfGroupedBy, \
                              DictMetric, \
                              AddRemoveKeys
    
# .distance, .validate, .compare, .supports_domain

In [45]:
AbsoluteDifference().distance(
    np.int64(20), np.int64(82), NumpyIntegerDomain()
)

62

In [46]:
domain = SparkDataFrameDomain(
    {
        "A": SparkIntegerColumnDescriptor(),
    }
)

value1 = spark.createDataFrame(
    pd.DataFrame({"A": [1, 1]}),
)
value2 = spark.createDataFrame(
    pd.DataFrame({"A": [1, 1]})
)

SymmetricDifference().distance(
    value1, value2, domain
)

NameError: name 'SparkIntegerColumnDescriptor' is not defined

In [47]:
# ExactNumberMetric().distance(
#     np.int64(20), np.int64(82), NumpyIntegerDomain()
# )

In [48]:
from pyspark.sql import SparkSession
from tmlt.core.domains.spark_domains import (
    SparkColumnsDescriptor,
    SparkIntegerColumnDescriptor,
)


In [51]:
spark = SparkSession.builder.getOrCreate()
domain = SparkDataFrameDomain(
    {
        "A": SparkIntegerColumnDescriptor(),
        "B": SparkIntegerColumnDescriptor(),
    }
)
df1 = spark.createDataFrame(
    pd.DataFrame({"A": [1, 3, 1, 2, 3], "B": [2, 2, 2, 4, 3]})
)
df2 = spark.createDataFrame(pd.DataFrame({"A": [1, 2, 1], "B": [2, 4, 1]}))
SymmetricDifference().distance(df1, df2, domain)

                                                                                

4

In [58]:
from tmlt.core.domains.spark_domains import SparkGroupedDataFrameDomain
from tmlt.core.utils.grouped_dataframe import GroupedDataFrame 

In [59]:
group_keys = spark.createDataFrame(pd.DataFrame({"B": [1, 2, 4]}))
domain = SparkGroupedDataFrameDomain(
    {
        "A": SparkIntegerColumnDescriptor(),
        "B": SparkIntegerColumnDescriptor(),
    },
    group_keys,
)
grouped_df1 = GroupedDataFrame(df1, group_keys)
grouped_df2 = GroupedDataFrame(df2, group_keys)
SymmetricDifference().distance(grouped_df1, grouped_df2, domain)

                                                                                

4

In [None]:
spark = SparkSession.builder.getOrCreate()
domain = SparkDataFrameDomain(
    {
        "A": SparkIntegerColumnDescriptor(),
        "B": SparkIntegerColumnDescriptor(),
    }
)
value1 = spark.createDataFrame(
    pd.DataFrame({"A": [1, 23], "B": [3, 1]})
)
value2 = spark.createDataFrame(
    pd.DataFrame({"A": [2, 20], "B": [1, 8]})
)
print(OnColumn("A", SumOf(AbsoluteDifference())).distance(value1, value2, domain))

print(OnColumn("B", RootSumOfSquared(AbsoluteDifference())).distance(
    value1, value2, domain
))

In [None]:
spark = SparkSession.builder.getOrCreate()
domain = SparkDataFrameDomain(
    {
        "A": SparkIntegerColumnDescriptor(),
        "B": SparkIntegerColumnDescriptor(),
    }
)
metric = OnColumns(
    [
        OnColumn("A", SumOf(AbsoluteDifference())),
        OnColumn("B", RootSumOfSquared(AbsoluteDifference())),
    ]
)
value1 = spark.createDataFrame(
    pd.DataFrame({"A": [1, 23], "B": [3, 1]})
)
value2 = spark.createDataFrame(
    pd.DataFrame({"A": [2, 20], "B": [1, 8]})
)
metric.distance(value1, value2, domain)


In [55]:
from tmlt.core.domains.collections import DictDomain 

spark = SparkSession.builder.getOrCreate()
metric = DictMetric(
    {"x": AbsoluteDifference(), "y": SymmetricDifference()}
)
domain = DictDomain(
    {
        "x": NumpyIntegerDomain(),
        "y": SparkDataFrameDomain(
            {
                "A": SparkIntegerColumnDescriptor(),
                "B": SparkIntegerColumnDescriptor(),
            }
        ),
    }
)
df1 = spark.createDataFrame(
    pd.DataFrame({"A": [1, 1, 3], "B": [2, 1, 4]})
)
df2 = spark.createDataFrame(pd.DataFrame({"A": [2, 1], "B": [1, 1]}))
value1 = {"x": np.int64(1), "y": df1}
value2 = {"x": np.int64(10), "y": df2}
metric.distance(value1, value2, domain)

                                                                                

{'x': 9, 'y': 3}

In [57]:
# import pandas as pd
# from pyspark.sql import SparkSession
from tmlt.core.domains.spark_domains import (
    SparkIntegerColumnDescriptor,
    SparkStringColumnDescriptor,
)
spark = SparkSession.builder.getOrCreate()
domain = DictDomain(
    {
        1: SparkDataFrameDomain(
            {
                "A": SparkIntegerColumnDescriptor(),
                "B": SparkIntegerColumnDescriptor(),
            },
        ),
        2: SparkDataFrameDomain(
            {
                "C": SparkIntegerColumnDescriptor(),
                "D": SparkStringColumnDescriptor(),
            },
        ),
    }
)
metric = AddRemoveKeys({1: "A", 2: "C"})
# key=1 matches, key=2 is only in value1, key=3 is only in value2, key=4
# differs
value1 = {
    1: spark.createDataFrame(
            pd.DataFrame(
            {
                "A": [1, 1, 2],
                "B": [1, 1, 1],
            }
        )
    ),
    2: spark.createDataFrame(
        pd.DataFrame(
            {
                "C": [1, 4],
                "D": ["1", "1"],
            }
        )
    )
}
value2 = {
    1: spark.createDataFrame(
            pd.DataFrame(
            {
                "A": [1, 1, 3],
                "B": [1, 1, 1],
            }
        )
    ),
    2: spark.createDataFrame(
        pd.DataFrame(
            {
                "C": [1, 4],
                "D": ["1", "2"],
            }
        )
    )
}
metric.distance(value1, value2, domain)

                                                                                

4

In [60]:
from tmlt.core.utils.parameters import calculate_noise_scale

In [61]:
calculate_noise_scale(
    d_in=1,
    d_out=1,
    output_measure=PureDP(),
)

1

In [62]:
calculate_noise_scale(
    d_in=2,
    d_out=1,
    output_measure=PureDP(),
)

2

In [63]:
calculate_noise_scale(
    d_in=1,
    d_out=2,
    output_measure=PureDP(),
)

1/2

In [64]:
calculate_noise_scale(
    d_in=1,
    d_out=1,
    output_measure=RhoZCDP(),
)

sqrt(2)/2

In [65]:
calculate_noise_scale(
    d_in=2,
    d_out=1,
    output_measure=RhoZCDP(),
)

sqrt(2)

In [66]:
calculate_noise_scale(
    d_in=1,
    d_out=2,
    output_measure=RhoZCDP(),
)

1/2

In [68]:
calculate_noise_scale(
    d_in=1,
    d_out=0,
    output_measure=PureDP(),
)


oo