**Computation of DP Aggregations for Client Profiles and Aggregated Transactions (Mobile Money)**

In [25]:
import pandas as pd
import numpy as np

In [2]:
path= "mobilemoney.csv"

In [26]:
#@markdown Install dependencies

import os
os.chdir('/content')
!pip install pipeline-dp apache_beam
# If you'd like to test the most recent version of PipelineDP, uncomment the
# following lines and comment-out the previous line.
# !git clone https://github.com/OpenMined/PipelineDP.git
# !pip install -r PipelineDP/requirements.dev.txt

import sys
sys.path.insert(0,'/content/PipelineDP')

from IPython.display import clear_output
clear_output()

import apache_beam as beam
from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.interactive import interactive_runner
from apache_beam.runners.interactive.interactive_beam import *
from dataclasses import dataclass
import pipeline_dp

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [27]:
#Load and inspect the data
df = pd.read_csv(path)

In [28]:
# Display the column names of the DataFrame
df_columns = df.columns.tolist()
print(df_columns)


['TRANSACTION_ID', 'TRANSACTION_START_DATE', 'TRANSACTION_START_TIME', 'TRANSACTION_TYPE', 'TRANSACTION_ENTRY_TYPE', 'PAYMENT_METHOD_TYPE', 'SERVICE_TYPE', 'TRANSACTION_AMOUNT', 'TRANSACTION_TAX_AMOUNT', 'TRANSACTION_APPROVED_AMOUNT', 'COMMISSION', 'INITIATING_PARTY_ACCOUNT_ID', 'INITIATING_PARTY_MSISDN', 'INITIATING_PARTY_ACCOUNT_TYPE', 'INITIATING_PARTY_ACCOUNT_CATEGORY', 'INITIATING_PARTY_ACCOUNT_CATEGORY_SEGMENT', 'INITIATING_PARTY_ACCOUNT_BALANCE_BEFORE', 'INITIATING_PARTY_ACCOUNT_BALANCE_AFTER', 'RECIPIENT_PARTY_ACCOUNT_ID', 'RECIPIENT_PARTY_MSISDN', 'RECIPIENT_PARTY_ACCOUNT_TYPE', 'RECIPIENT_PARTY_ACCOUNT_CATEGORY', 'RECIPIENT_PARTY_ACCOUNT_CATEGORY_SEGMENT', 'BANK_ID', 'TRANSACTION_STATUS', 'TRANSACTION_ERROR_CODE', 'REQUEST_GATEWAY_TYPE', 'LOCATION_INFO', 'SERVICE_TYPE_KEY', 'RECIPIENT_PARTY_ACCOUNT_CATEGORY_SEGMENT_KEY', 'FAILURE_TYPE_KEY', 'COUNTRY_KEY', 'SOURCE_SYSTEM_KEY', 'FILEID', 'LOAD_DATE', 'USER_TYPE', 'PARTY_ACCESS_ID', 'UNIT_PRICE', 'TRANSFER_PROFILE_DETAILS_ID', '

**Differentially Private Client Profiles**

In [41]:
import pipeline_dp
from pipeline_dp import DPEngine, BudgetAccountant, AggregateParams, Metrics, DataExtractors, NaiveBudgetAccountant, LocalBackend

# Grouping by "SERVICE_TYPE" and account ID to count transactions
transaction_counts = df.groupby(['SERVICE_TYPE', 'INITIATING_PARTY_ACCOUNT_ID']).size().reset_index(name='counts')

# Convert transaction_counts DataFrame to list of dicts
data = transaction_counts.to_dict(orient='records')

# Setup the differential privacy engine
backend = LocalBackend()
budget_accountant = NaiveBudgetAccountant(total_epsilon=1.0, total_delta=1e-6)
dp_engine = DPEngine(budget_accountant, backend)

# Define the data extractors
data_extractors = DataExtractors(
    partition_extractor=lambda x: x['SERVICE_TYPE'],
    privacy_id_extractor=lambda x: x['INITIATING_PARTY_ACCOUNT_ID'],
    value_extractor=lambda x: x['counts']
)

# Define aggregate parameters for computing count and sum
params = AggregateParams(
    metrics=[Metrics.COUNT, Metrics.SUM, Metrics.MEAN, Metrics.VARIANCE],
    max_partitions_contributed=2,
    max_contributions_per_partition=10,
    min_value=0,
    max_value=42168  # Max transaction count
)

# Compute differentially private results
dp_result = dp_engine.aggregate(data, params, data_extractors)
budget_accountant.compute_budgets()

# Displaying the differentially private client profile statistics
print("Differentially Private Client Profiles:")
for result in list(dp_result):
    print(result)


Differentially Private Client Profiles:
('AUTOO2C', MetricsTuple(variance=-6519231975.436259, count=91.25002337584738, sum=-5547656.598814623, mean=-60796.22112494726))
('CASHIN', MetricsTuple(variance=27457516.41869557, count=10052.805005340953, sum=-1364325.2261735392, mean=-135.7158748676302))
('CASHOUTPAS', MetricsTuple(variance=26203703.952795804, count=4915.317009570077, sum=738732.8398997145, mean=150.29200323425903))
('C2CREQ', MetricsTuple(variance=175799868.99099442, count=210.0842174270656, sum=1121958.5275041321, mean=5340.517918218391))
('CCPSFREQ', MetricsTuple(variance=-53220186.75329572, count=3685.863283081446, sum=-1783254.7014179914, mean=-483.80923665925))
('P2P', MetricsTuple(variance=-170376414.1642229, count=932.8465637186309, sum=-4397121.754158224, mean=-4713.660236502197))
('MERCHPAY', MetricsTuple(variance=-1800107836.3088994, count=52.41569602256641, sum=-1527304.6971807238, mean=-29138.308046566373))


**Differentially Private Aggregated Transactions**

In [42]:
import pipeline_dp
from pipeline_dp import DPEngine, BudgetAccountant, AggregateParams, Metrics, DataExtractors, NaiveBudgetAccountant, LocalBackend
import math

# Parse dates and time, and create additional time-related columns
df['TRANSACTION_START_DATE'] = pd.to_datetime(df['TRANSACTION_START_DATE'])
df['month'] = df['TRANSACTION_START_DATE'].dt.month
df['day'] = df['TRANSACTION_START_DATE'].dt.dayofweek
df['hour'] = pd.to_datetime(df['TRANSACTION_START_TIME'], format='%H:%M:%S').dt.hour

# Convert DataFrame to list of dicts
data = df[['SERVICE_TYPE', 'month', 'day', 'hour', 'TRANSACTION_ID', 'TRANSACTION_AMOUNT']].to_dict(orient='records')

# Setup the differential privacy engine
backend = LocalBackend()
budget_accountant = NaiveBudgetAccountant(total_epsilon=1.0, total_delta=1e-5)  # Privacy budget
dp_engine = DPEngine(budget_accountant, backend)

# Define the data extractors
data_extractors = DataExtractors(
    partition_extractor=lambda x: (x['SERVICE_TYPE'], x['month'], x['day'], x['hour']),
    privacy_id_extractor=lambda x: x['TRANSACTION_ID'],
    value_extractor=lambda x: x['TRANSACTION_AMOUNT']
)

# Define aggregate parameters
params = AggregateParams(
    metrics=[Metrics.COUNT, Metrics.SUM, Metrics.MEAN, Metrics.VARIANCE],
    max_partitions_contributed=2,  # Reduced partition contributions
    max_contributions_per_partition=2,  # Reduced contributions per partition
    min_value=0,  # Minimum
    max_value=1000000000  # Maximum transaction value
)

# Compute differentially private results
dp_result = dp_engine.aggregate(data, params, data_extractors)
budget_accountant.compute_budgets()

# Sort the results by service_info before displaying
sorted_dp_result = sorted(dp_result, key=lambda x: x[0])  # Sorting by service_info tuple

# Displaying results
print("Differentially Private Aggregated Transactions:")
for result in sorted_dp_result:
    service_info, metrics = result
    variance = metrics.variance if metrics.variance >= 0 else 0  # Ensure non-negative variance
    stddev = math.sqrt(variance)  # Compute standard deviation
    print(f"Service: {service_info}, Count: {metrics.count}, Sum: {metrics.sum}, Mean: {metrics.mean}, StdDev: {stddev}, Variance: {variance}")


Differentially Private Aggregated Transactions:
Service: ('CASHIN', 3, 4, 7), Count: 326.43167237372836, Sum: 22688869153.31731, Mean: 69505722.2490992, StdDev: 0.0, Variance: 0
Service: ('CASHIN', 3, 4, 8), Count: 636.7257506715541, Sum: 150528834242.30832, Mean: 236410784.5230787, StdDev: 296564613.5643319, Variance: 8.79505700185615e+16
Service: ('CASHIN', 3, 4, 9), Count: 1514.4910971408535, Sum: 323296938407.5674, Mean: 213469025.34977365, StdDev: 304058206.6777869, Variance: 9.245139304811176e+16
Service: ('CASHIN', 3, 4, 10), Count: 720.5506549464772, Sum: 135194452651.59798, Mean: 187626576.59597892, StdDev: 267114244.49310893, Variance: 7.135001961112437e+16
Service: ('CASHIN', 3, 4, 11), Count: 915.3698435143742, Sum: 171029577253.96835, Mean: 186842049.10808015, StdDev: 292198478.39800423, Variance: 8.537995077810896e+16
Service: ('CASHIN', 3, 4, 12), Count: 684.3125696550997, Sum: 153328040804.06546, Mean: 224061412.2890426, StdDev: 377790187.2057453, Variance: 1.4272542554

**Parameter Tuning for Client Profiles**

In [47]:
import pipeline_dp
from dataclasses import dataclass

# Define possible values for hyperparameters
noise_options = [pipeline_dp.NoiseKind.LAPLACE, pipeline_dp.NoiseKind.GAUSSIAN]
max_partitions_options = [1, 2, 5]
max_contributions_options = [1, 3, 5]

@dataclass
class HyperParameters:
    noise_kind: pipeline_dp.NoiseKind
    max_partitions_contributed: int
    max_contributions_per_partition: int

def evaluate_parameters(data, backend, hyper_params, total_epsilon=1.0, total_delta=1e-5):
    # Reinitialize the budget accountant for each evaluation
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=total_epsilon, total_delta=total_delta)

    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x['SERVICE_TYPE'],
        privacy_id_extractor=lambda x: x['TRANSACTION_ID'],
        value_extractor=lambda x: x['TRANSACTION_AMOUNT']
    )
    params = pipeline_dp.AggregateParams(
        metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM],
        max_partitions_contributed=hyper_params.max_partitions_contributed,
        max_contributions_per_partition=hyper_params.max_contributions_per_partition,
        min_value=0,
        max_value=1000000000
    )
    # Compute DP results
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    # Finalize the budget accountant for this evaluation
    budget_accountant.compute_budgets()

    # Correctly extract values from the result, which is presumably a list of tuples
    return np.var([metrics.sum for (_, metrics) in dp_result])

# Adjust the call in the main evaluation loop
results = []
for noise in noise_options:
    for max_parts in max_partitions_options:
        for max_contribs in max_contributions_options:
            hyper_params = HyperParameters(noise_kind=noise,
                                           max_partitions_contributed=max_parts,
                                           max_contributions_per_partition=max_contribs)
            variance = evaluate_parameters(data, backend, hyper_params)
            results.append((hyper_params, variance))

# Find the parameters with the minimum variance
best_params = min(results, key=lambda x: x[1])
print("Best HyperParameters:", best_params)


Best HyperParameters: (HyperParameters(noise_kind=<NoiseKind.GAUSSIAN: 'gaussian'>, max_partitions_contributed=1, max_contributions_per_partition=5), 1.522208203784179e+24)


**Parameter Tuning for Aggregated Transactions**

In [44]:
import pipeline_dp
from dataclasses import dataclass

# Define hyperparameters data class
@dataclass
class HyperParameters:
    noise_kind: pipeline_dp.NoiseKind
    max_partitions_contributed: int
    max_contributions_per_partition: int

# Possible values for hyperparameters
noise_options = [pipeline_dp.NoiseKind.LAPLACE, pipeline_dp.NoiseKind.GAUSSIAN]
max_partitions_options = [1, 2, 5]
max_contributions_options = [1, 3, 5]


In [45]:
def evaluate_aggregated_transactions(data, backend, hyper_params, total_epsilon=1.0, total_delta=1e-6):
    # Initialize budget accountant for each evaluation
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=total_epsilon, total_delta=total_delta)

    # Initialize DP engine
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: (x['SERVICE_TYPE'], x['month'], x['day'], x['hour']),
        privacy_id_extractor=lambda x: x['TRANSACTION_ID'],
        value_extractor=lambda x: x['TRANSACTION_AMOUNT']
    )

    # Aggregate parameters definition
    params = pipeline_dp.AggregateParams(
        metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN, pipeline_dp.Metrics.VARIANCE],
        max_partitions_contributed=hyper_params.max_partitions_contributed,
        max_contributions_per_partition=hyper_params.max_contributions_per_partition,
        min_value=0,
        max_value=1000000000  # Adjust based on your data
    )

    # Run DP aggregation
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()

    # Return a metric for evaluation, e.g., variance of means
    return np.var([metrics.mean for (_, metrics) in dp_result])


In [46]:
results = []
for noise in noise_options:
    for max_parts in max_partitions_options:
        for max_contribs in max_contributions_options:
            hyper_params = HyperParameters(noise_kind=noise,
                                           max_partitions_contributed=max_parts,
                                           max_contributions_per_partition=max_contribs)
            variance = evaluate_aggregated_transactions(data, backend, hyper_params)
            results.append((hyper_params, variance))

# Find the best parameters
best_params = min(results, key=lambda x: x[1])
print("Best HyperParameters for Aggregated Transactions:", best_params)


Best HyperParameters for Aggregated Transactions: (HyperParameters(noise_kind=<NoiseKind.GAUSSIAN: 'gaussian'>, max_partitions_contributed=5, max_contributions_per_partition=1), 2.239932056391873e+16)
