In [65]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lower, col, count, concat_ws
from pyspark.sql.types import Row
from pyspark import RDD
from typing import List, Tuple, Callable, Dict, Optional, Any, NamedTuple
import numpy as np
import scipy.stats as stats

spark = (SparkSession.builder.appName("pacdb")
         .config("spark.executor.memory", "512M")
         .config("spark.sql.warehouse.dir", ".spark")
         .enableHiveSupport()
         .getOrCreate())

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.lines as mlines
import matplotlib.patches as mpatches

# set font to Times New Roman
LATEX = False
if LATEX:
    mpl.rcParams['text.usetex'] = True
    mpl.rcParams["font.family"] = "serif"
    mpl.rcParams["font.serif"] = "Times"
else:
    mpl.rcParams['text.usetex'] = False
    mpl.rcParams["font.family"] = "Times New Roman"

plt.rcParams['svg.fonttype'] = 'none'
mpl.rcParams['savefig.dpi'] = 300

import matplotlib_inline.backend_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg')

mpl.rcParams['axes.titleweight'] = 'bold'

24/05/02 02:43:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [66]:
#@markdown Install dependencies and load data

import sys

from IPython.display import clear_output
clear_output()

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [67]:
import pandas as pd

math_df = pd.read_csv("./pipeline_dp/data/student_performance/student-mat.csv", delimiter=";") 
math_df['ID'] = range(1, len(math_df) + 1)

portuguese_df = pd.read_csv("./pipeline_dp/data/student_performance/student-por.csv", delimiter=";")
portuguese_df['ID'] = range(1, len(portuguese_df) + 1)

In [68]:
math_df.head()

Unnamed: 0,school,sex,age,address,famsize,Pstatus,Medu,Fedu,Mjob,Fjob,...,freetime,goout,Dalc,Walc,health,absences,G1,G2,G3,ID
0,GP,F,18,U,GT3,A,4,4,at_home,teacher,...,3,4,1,1,3,6,5,6,6,1
1,GP,F,17,U,GT3,T,1,1,at_home,other,...,3,3,1,1,3,4,5,5,6,2
2,GP,F,15,U,LE3,T,1,1,at_home,other,...,3,2,2,3,3,10,7,8,10,3
3,GP,F,15,U,GT3,T,4,2,health,services,...,2,2,1,1,5,2,15,14,15,4
4,GP,F,16,U,GT3,T,3,3,other,other,...,3,2,1,2,5,4,6,10,10,5


In [69]:
portuguese_df.head()

Unnamed: 0,school,sex,age,address,famsize,Pstatus,Medu,Fedu,Mjob,Fjob,...,freetime,goout,Dalc,Walc,health,absences,G1,G2,G3,ID
0,GP,F,18,U,GT3,A,4,4,at_home,teacher,...,3,4,1,1,3,4,0,11,11,1
1,GP,F,17,U,GT3,T,1,1,at_home,other,...,3,3,1,1,3,2,9,11,11,2
2,GP,F,15,U,LE3,T,1,1,at_home,other,...,3,2,2,3,3,6,12,13,12,3
3,GP,F,15,U,GT3,T,4,2,health,services,...,2,2,1,1,5,0,14,14,14,4
4,GP,F,16,U,GT3,T,3,3,other,other,...,3,2,1,2,5,0,11,13,13,5


## Sample Query

In [31]:
import pandas as pd
data = {'user': ['A', 'B', 'C'],
        'salary': [10, 20, 15]} 
df = pd.DataFrame(data)

filtered_df = df[(df['salary'] >= 1) & (df['salary'] < 29)]

print(filtered_df)

  user  salary
0    A      10
1    B      20
2    C      15


In [32]:
import pipeline_dp
from pipeline_dp.aggregate_params import AggregateParams
from pipeline_dp.data_extractors import DataExtractors
from pipeline_dp import budget_accounting
import pipeline_dp.aggregate_params as aggregate_params
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp.pipeline_backend as pipeline_backend
from tests import dp_engine_test
rows = [index_row[1] for index_row in filtered_df.iterrows()]

backend = pipeline_backend.LocalBackend()
budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

In [33]:
data_extractors = DataExtractors(
   partition_extractor=lambda row: row.user, # partition is also ID
   privacy_id_extractor=lambda row: row.user, # privacy ID
   value_extractor=lambda row: row.salary) # value to be aggregated - we count the number of IDs

params = AggregateParams(
   noise_kind=pipeline_dp.NoiseKind.LAPLACE,
   metrics=[pipeline_dp.Metrics.COUNT],
   min_value=0,
   max_value=10,
   max_partitions_contributed=1,
   max_contributions_per_partition=1
)

In [34]:
dp_result = dp_engine.aggregate(rows, params, data_extractors)
budget_accountant.compute_budgets()
dp_result = list(dp_result)

print(dp_result)

[]


## GROUP BY Guardian AVG Absences

In [70]:
import pandas as pd

avg_absences_per_guardian = math_df[math_df['guardian'] == 'mother']['absences'].mean()
print(avg_absences_per_guardian)

5.835164835164835


In [71]:
avg_absences_per_guardian = math_df.groupby('guardian')['absences'].mean()

print("Average absences per guardian:")
print(avg_absences_per_guardian)

Average absences per guardian:
guardian
father    3.977778
mother    5.835165
other     9.500000
Name: absences, dtype: float64


#### Create 3 DFs per guardian, and get the avg aggregation of absences per dataset for different values of epsilon

In [72]:
math_df_guardian_mother = math_df[math_df['guardian'] == 'mother']
math_df_guardian_father = math_df[math_df['guardian'] == 'father']
math_df_guardian_other = math_df[math_df['guardian'] == 'other']

In [90]:
avg_absences_per_guardian = math_df.groupby('guardian')['absences'].mean()

print("Average absences per guardian:")
print(avg_absences_per_guardian)

Average absences per guardian:
guardian
father    3.977778
mother    5.835165
other     9.500000
Name: absences, dtype: float64


In [93]:
from pipeline_dp import budget_accounting
import pipeline_dp.aggregate_params as aggregate_params
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp.pipeline_backend as pipeline_backend
from tests import dp_engine_test
rows = [index_row[1] for index_row in math_df.iterrows()]
print(rows)

[school             GP
sex                 F
age                18
address             U
famsize           GT3
Pstatus             A
Medu                4
Fedu                4
Mjob          at_home
Fjob          teacher
reason         course
guardian       mother
traveltime          2
studytime           2
failures            0
schoolsup         yes
famsup             no
paid               no
activities         no
nursery           yes
higher            yes
internet           no
romantic           no
famrel              4
freetime            3
goout               4
Dalc                1
Walc                1
health              3
absences            6
G1                  5
G2                  6
G3                  6
ID                  1
Name: 0, dtype: object, school             GP
sex                 F
age                17
address             U
famsize           GT3
Pstatus             T
Medu                1
Fedu                1
Mjob          at_home
Fjob            other
reason 

In [104]:
backend = pipeline_backend.LocalBackend()
budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=0.9, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

In [107]:
import pipeline_dp
from pipeline_dp.aggregate_params import AggregateParams
from pipeline_dp.data_extractors import DataExtractors

def calculate_average_noise_over_n_trials(n, df, epsilon):
   rows = [index_row[1] for index_row in df.iterrows()]

   data_extractors = DataExtractors(
      partition_extractor=lambda row: row.guardian, # partition is guardian
      privacy_id_extractor=lambda row: row.ID, # privacy ID
      value_extractor=lambda row: row.absences) # value to be aggregated - we count the number of absences

   results = []
   # for every epsilon, average the noise over 10 trials
   # results = [[], []....]
   for eps in epsilon:
      result_per_epsilon = []
      for i in range(n):
         backend = pipeline_backend.LocalBackend()
         budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=eps, total_delta=1e-6)
         dp_engine = DPEngine(budget_accountant, backend)

         params = AggregateParams(
            noise_kind=pipeline_dp.NoiseKind.LAPLACE,
            metrics=[pipeline_dp.Metrics.MEAN],
            min_value=0,
            max_value=100,
            max_partitions_contributed=1,
            max_contributions_per_partition=1
         )
         
         dp_result = dp_engine.aggregate(rows, params, data_extractors)
         budget_accountant.compute_budgets()
         dp_result = list(dp_result)

         result_per_epsilon.append(dp_result)

      results.append(result_per_epsilon) 

   return results  


epsilon = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 2.0, 4.0]
n = 10
results = calculate_average_noise_over_n_trials(n, math_df, epsilon)

In [112]:
mother_true = 5.835165
father_true = 3.977778
other_true = 9.500000

# if result is empty, we assume no noise to average the noise over 10 trials
all_avg_noise = []

for result in results:
    avg_noise = []
    # every result is 10 trials result
    # for every result, average up the noise per category
    noise_mother = 0
    noise_father = 0
    noise_other = 0
    for every_dp_result in result:
            
        if every_dp_result:
            # Extract the mean value for guardian
            for guardian, mean in every_dp_result:
                if guardian == 'mother':
                    noise_mother += np.abs(mean.mean - mother_true)
                else:
                    noise_mother += 0

                if guardian == 'father':
                    noise_father += np.abs(mean.mean - mother_true)
                else:
                    noise_father += 0

                if guardian == 'other':
                    noise_other += np.abs(mean.mean - mother_true)
                else:
                    noise_other += 0

    avg_noise_mother = noise_mother / 10
    avg_noise_father = noise_father / 10
    avg_noise_other = noise_other / 10

    avg_noise.append(avg_noise_mother)
    avg_noise.append(avg_noise_father)
    avg_noise.append(avg_noise_other)

    all_avg_noise.append(avg_noise)


In [113]:
print(all_avg_noise)

[[1.1694128043250482, 0.0, 0.0], [4.30532767051362, 0.0, 0.0], [3.22854708628289, 0.7132221161878768, 0.0], [2.1188556829390945, 4.468924879188809, 0.0], [1.5892425427695174, 4.2349377703357405, 0.0], [1.8248581700143531, 5.131439947640051, 0.0], [1.1028275313942018, 4.311807878638314, 0.0], [0.9292278396503058, 5.119847619036629, 0.0], [0.6681523752415138, 2.436102895847193, 0.0], [0.6883566692412856, 3.560430136651088, 1.8601002216220592], [0.35454741807619855, 2.1824538692365847, 5.927983328740984], [0.18335971178546515, 1.6084845523207156, 3.5424529323274925]]


# JOIN 

Join both tables on ID and find the total number of absences as an aggregation of absences

In [35]:
# Non-private query

import pandas as pd

math_and_portuguese = pd.merge(math_df, portuguese_df, on='ID', how='outer')
math_and_portuguese.head()

math_and_portuguese['total_absences'] = math_and_portuguese['absences_x'].fillna(0) + math_and_portuguese['absences_y'].fillna(0)
math_and_portuguese.drop(columns=['absences_x', 'absences_y'], inplace=True)

total_absences_grouped_by_id = math_and_portuguese[['ID', 'total_absences']]

In [36]:
total_absences_grouped_by_id.head()

Unnamed: 0,ID,total_absences
0,1,10.0
1,2,6.0
2,3,16.0
3,4,2.0
4,5,4.0


In [37]:
from pipeline_dp import budget_accounting
import pipeline_dp.aggregate_params as aggregate_params
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp.pipeline_backend as pipeline_backend
from tests import dp_engine_test
rows = [index_row[1] for index_row in math_df.iterrows()]

backend = pipeline_backend.LocalBackend()
budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

## Find count of absences per gender

In [38]:
import pandas as pd

absences_per_gender = math_df.groupby('sex')['absences'].count()
print(absences_per_gender)


sex
F    208
M    187
Name: absences, dtype: int64


In [39]:
import pipeline_dp
from pipeline_dp.aggregate_params import AggregateParams
from pipeline_dp.data_extractors import DataExtractors

data_extractors = DataExtractors(
   partition_extractor=lambda row: row.sex, # partition is also ID
   privacy_id_extractor=lambda row: row.ID, # privacy ID
   value_extractor=lambda row: row.absences) # value to be aggregated - we count the number of IDs

params = AggregateParams(
   noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
   metrics=[pipeline_dp.Metrics.COUNT],
   min_value=0,
   max_value=100,
   max_partitions_contributed=1,
   max_contributions_per_partition=1
)

In [40]:
dp_result = dp_engine.aggregate(rows, params, data_extractors)
budget_accountant.compute_budgets()
dp_result = list(dp_result)

In [41]:
print(dp_result)

[('F', MetricsTuple(count=204.00675678153172)), ('M', MetricsTuple(count=182.15042488753755))]


## Count of absences per gender - only 1 F record

In [42]:
import pandas as pd

female_records = math_df[math_df['sex'] == 'F']

if not female_records.empty:
    random_female_record = female_records.sample(n=1, random_state=42)  
    df_trial = math_df.drop(female_records.index.difference(random_female_record.index))


In [43]:
import pandas as pd

absences_per_gender = df_trial.groupby('sex')['absences'].count()
print(absences_per_gender)


sex
F      1
M    187
Name: absences, dtype: int64


In [44]:
from pipeline_dp import budget_accounting
import pipeline_dp.aggregate_params as aggregate_params
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp.pipeline_backend as pipeline_backend
from tests import dp_engine_test
rows = [index_row[1] for index_row in df_trial.iterrows()]

backend = pipeline_backend.LocalBackend()
budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

In [45]:
import pipeline_dp
from pipeline_dp.aggregate_params import AggregateParams
from pipeline_dp.data_extractors import DataExtractors

data_extractors = DataExtractors(
   partition_extractor=lambda row: row.sex, # partition is also ID
   privacy_id_extractor=lambda row: row.ID, # privacy ID
   value_extractor=lambda row: row.absences) # value to be aggregated - we count the number of IDs

params = AggregateParams(
   noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
   metrics=[pipeline_dp.Metrics.COUNT],
   min_value=0,
   max_value=100,
   max_partitions_contributed=1,
   max_contributions_per_partition=1
)

In [46]:
dp_result = dp_engine.aggregate(rows, params, data_extractors)
budget_accountant.compute_budgets()
dp_result = list(dp_result)

In [47]:
print(dp_result)

[('M', MetricsTuple(count=174.20468192308664))]


 ## Count of absences per gender - 0 F records

In [48]:
# Drop all records with gender as 'F'
df_trial_2 = df_trial[df_trial['sex'] != 'F']

In [49]:
import pandas as pd

absences_per_gender = df_trial_2.groupby('sex')['absences'].count()
print(absences_per_gender)


sex
M    187
Name: absences, dtype: int64


In [50]:
from pipeline_dp import budget_accounting
import pipeline_dp.aggregate_params as aggregate_params
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp.pipeline_backend as pipeline_backend
from tests import dp_engine_test
rows = [index_row[1] for index_row in df_trial_2.iterrows()]

backend = pipeline_backend.LocalBackend()
budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

In [51]:
import pipeline_dp
from pipeline_dp.aggregate_params import AggregateParams
from pipeline_dp.data_extractors import DataExtractors

data_extractors = DataExtractors(
   partition_extractor=lambda row: row.sex, # partition is also ID
   privacy_id_extractor=lambda row: row.ID, # privacy ID
   value_extractor=lambda row: row.absences) # value to be aggregated - we count the number of IDs

params = AggregateParams(
   noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
   metrics=[pipeline_dp.Metrics.COUNT],
   min_value=0,
   max_value=100,
   max_partitions_contributed=1,
   max_contributions_per_partition=1
)

In [52]:
dp_result = dp_engine.aggregate(rows, params, data_extractors)
budget_accountant.compute_budgets()
dp_result = list(dp_result)

In [53]:
print(dp_result)

[('M', MetricsTuple(count=183.65996613181704))]


In [54]:
df_trial_3 = math_df[math_df['absences'] > 20]
df_trial_4 = math_df[math_df['absences'] > 30]

df_trial_5 = math_df[math_df['absences'] > 10]

In [55]:
absences_per_gender = df_trial_3.groupby('sex')['absences'].count()
absences_per_gender_ = df_trial_4.groupby('sex')['absences'].count()
absences_per_gender__ = df_trial_5.groupby('sex')['absences'].count()
print(absences_per_gender)
print(absences_per_gender_)
print(absences_per_gender__)

sex
F    12
M     3
Name: absences, dtype: int64
sex
F    4
M    1
Name: absences, dtype: int64
sex
F    37
M    29
Name: absences, dtype: int64


In [56]:
from pipeline_dp import budget_accounting
import pipeline_dp.aggregate_params as aggregate_params
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp.pipeline_backend as pipeline_backend
from tests import dp_engine_test
rows = [index_row[1] for index_row in df_trial_3.iterrows()]

backend = pipeline_backend.LocalBackend()
budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

In [57]:
import pipeline_dp
from pipeline_dp.aggregate_params import AggregateParams
from pipeline_dp.data_extractors import DataExtractors

data_extractors = DataExtractors(
   partition_extractor=lambda row: row.sex, # partition is also ID
   privacy_id_extractor=lambda row: row.ID, # privacy ID
   value_extractor=lambda row: row.absences) # value to be aggregated - we count the number of IDs

params = AggregateParams(
   noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
   metrics=[pipeline_dp.Metrics.COUNT],
   min_value=0,
   max_value=100,
   max_partitions_contributed=1,
   max_contributions_per_partition=1
)

In [58]:
dp_result = dp_engine.aggregate(rows, params, data_extractors)
budget_accountant.compute_budgets()
dp_result = list(dp_result)
print(dp_result)

[]


In [59]:
from pipeline_dp import budget_accounting
import pipeline_dp.aggregate_params as aggregate_params
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp.pipeline_backend as pipeline_backend
from tests import dp_engine_test
rows = [index_row[1] for index_row in df_trial_4.iterrows()]

backend = pipeline_backend.LocalBackend()
budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

In [60]:
import pipeline_dp
from pipeline_dp.aggregate_params import AggregateParams
from pipeline_dp.data_extractors import DataExtractors

data_extractors = DataExtractors(
   partition_extractor=lambda row: row.sex, # partition is also ID
   privacy_id_extractor=lambda row: row.ID, # privacy ID
   value_extractor=lambda row: row.absences) # value to be aggregated - we count the number of IDs

params = AggregateParams(
   noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
   metrics=[pipeline_dp.Metrics.COUNT],
   min_value=0,
   max_value=100,
   max_partitions_contributed=1,
   max_contributions_per_partition=1
)

In [61]:
dp_result = dp_engine.aggregate(rows, params, data_extractors)
budget_accountant.compute_budgets()
dp_result = list(dp_result)
print(dp_result)

[]


In [62]:
from pipeline_dp import budget_accounting
import pipeline_dp.aggregate_params as aggregate_params
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp.pipeline_backend as pipeline_backend
from tests import dp_engine_test
rows = [index_row[1] for index_row in df_trial_5.iterrows()]

backend = pipeline_backend.LocalBackend()
budget_accountant = budget_accounting.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

In [63]:
import pipeline_dp
from pipeline_dp.aggregate_params import AggregateParams
from pipeline_dp.data_extractors import DataExtractors

data_extractors = DataExtractors(
   partition_extractor=lambda row: row.sex, # partition is also ID
   privacy_id_extractor=lambda row: row.ID, # privacy ID
   value_extractor=lambda row: row.absences) # value to be aggregated - we count the number of IDs

params = AggregateParams(
   noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
   metrics=[pipeline_dp.Metrics.COUNT],
   min_value=0,
   max_value=100,
   max_partitions_contributed=1,
   max_contributions_per_partition=1
)

In [64]:
dp_result = dp_engine.aggregate(rows, params, data_extractors)
budget_accountant.compute_budgets()
dp_result = list(dp_result)
print(dp_result)

[('M', MetricsTuple(count=34.12707479077276)), ('F', MetricsTuple(count=35.353199567679965))]
