In [2]:
%load_ext autoreload
%load_ext line_profiler
%autoreload 2
import syft as sy
import numpy as np
import pandas as pd
from syft.core.adp.entity_list import EntityList
from syft.core.adp.data_subject_ledger import DataSubjectLedger
from syft.core.adp.entity import Entity
from syft.core.adp.ledger_store import DictLedgerStore
from time import time
import pyarrow.parquet as pq
from syft.util import size_mb

Loaded constant2epsilon cache of size: (300000,)


In [None]:
t0 = time()
df = pq.read_table("/home/ruchi/1B_rows_dataset_sample.parquet")
tf = time() - t0

print(f"Time taken to read file with {df.shape[0]/1e6} million rows : {tf} seconds")

In [None]:
scale = 1000  # This is put here to reduce the size of the cache :)
t0 = time()
impressions = df['impressions'].to_numpy()//scale
data_subjects = EntityList.from_series(df['user_id'])
tf = time() - t0

print(f"Time taken to create inputs for Syft Tensor: {tf} seconds")

In [None]:
t0 = time()
tweets_data = sy.Tensor(impressions).private(min_val=700_000/scale, max_val=20e6/scale, entities = data_subjects,ndept=True)  # RUn this for 1 billion rows
# tweets_data = sy.Tensor(impressions).private(min_val=500/scale, max_val=25000/scale, entities = data_subjects,ndept=True)  # Run this for 1 million rows
tf = time() - t0

print(f"Time make Private Syft Tensor: {tf} seconds")

In [3]:
domain_node = sy.login(email="info@openmined.org",password="changethis",port=8081)


Anyone can login as an admin to your node right now because your password is still the default PySyft username and password!!!

Connecting to localhost... done! 	 Logging into festive_song... done!


In [4]:
domain_node.privacy_budget

9126519.281209879

In [None]:
# from syft.core.node.common.node_service.user_manager.user_messages import (
#     UpdateUserMessage,
# )

# # Upgrade admins budget
# content = {"user_id": 1, "budget": 9_999_999}
# domain_node._perform_grid_request(grid_msg=UpdateUserMessage, content=content)

# domain_node.privacy_budget

In [None]:
%%time
name = f"Tweets data- {time()}"

domain_node.load_dataset(
    assets={name: tweets_data},
    name=name,
    description=" Tweets- 100M rows",
    use_blob_storage=True
)

In [5]:
domain_node.datasets

Idx,Name,Description,Assets,Id
[0],Tweets data- 1648114820.056691,Tweets- 100M rows,"[""Tweets data- 1648114820.056691""] ->",cf40e48d-72f8-4536-8f9d-9383ca23acf7
[1],Tweets data- 1648123216.9175086,Tweets- 100M rows,"[""Tweets data- 1648123216.9175086""] ->",29d587af-4b62-47c9-a27d-71a026ec2265
[2],Tweets data- 1648124216.139227,Tweets- 100M rows,"[""Tweets data- 1648124216.139227""] ->",fd68f34a-818d-4611-9194-1b8d788f96cf
[3],Tweets data- 1648124958.1514506,Tweets- 100M rows,"[""Tweets data- 1648124958.1514506""] ->",7d03bc3a-901e-43a2-b25e-6c3c7d243aff
[4],Tweets data- 1648131125.3170142,Tweets- 100M rows,"[""Tweets data- 1648131125.3170142""] ->",077a5f8c-264d-410f-b143-495a4c7857a3


In [6]:
data = domain_node.datasets[-1]["Tweets data- 1648131125.3170142"]

In [16]:
%%time
sum_result = data.sum()
sum_result.block

Loaded constant2epsilon cache of size: (300000,)
CPU times: user 2.52 s, sys: 133 ms, total: 2.65 s
Wall time: 1min 40s


<TensorPointer -> festive_song:29cc4282405a43268de27900dc814201>

In [17]:
sum_result.exists

True

In [18]:
%time
published_result = sum_result.publish(sigma=1e6)
published_result.block 

CPU times: user 6 µs, sys: 2 µs, total: 8 µs
Wall time: 17.4 µs
Please wait we're computing your query ...Completed. 🎉

<FloatPointer -> festive_song:290b74e0c5c54088aa31b407e479e86d>

In [None]:
published_result.exists

In [None]:
res = published_result.get(delete_obj=False)
print(res)

In [None]:
a = np.random.random(10).astype(np.int64)

In [None]:
a.dtype

In [None]:
a.dtype.name

In [None]:
np.dtype("int64")

In [None]:
# domain_node.requests

In [None]:
# domain_node.requests[-1].accept()

In [None]:
# domain_node.requests

In [None]:
# result_of_our_hard_labour = published_result.get()

# WE GOT AN OVERFLOW ERROR
- doesn't occur with 1M rows
- publish had completed, tho it took 602 seconds

In [None]:
# result_of_our_hard_labour

In [None]:
# impressions.sum()

In [None]:
def percentage_error(true_value, noisy_value):
    return (true_value - noisy_value)/true_value * 100

In [None]:
# percentage_error(true_value=impressions.sum(), noisy_value=result_of_our_hard_labour)

In [None]:
# (1644527104 - 11941995258)/11941995258 * 100

In [None]:
# 11941995258 /1e6

In [None]:
def calculate_bounds_for_mechanism(
     min_val_array, max_val_array
):
    """Calculates the squared L2 norm values needed to create a Mechanism, and calculate
    privacy budget + spend. If you calculate the privacy budget spend with the worst
    case bound, you can show this number to the DS. If you calculate it with the
    regular value (the value computed below when public_only = False, you cannot show
    the privacy budget to the DS because this violates privacy."""
    l2_norm_min = np.sqrt(np.sum(np.square(min_val_array)))
    l2_norm_max = np.sqrt(np.sum(np.square(max_val_array)))
    return l2_norm_min, l2_norm_max

In [None]:
def _get_batch_rdp_constants(
    sigma, scale, min_val, max_val,  L=1
) -> np.ndarray:
    min_val = min_val/scale
    max_val = max_val/scale
    # print(min_val, max_val)
    l2_norm_min, l2_norm_max = calculate_bounds_for_mechanism(min_val, max_val)
    
    # use the indices to get a "batch" of the full ledger. this is the only part
    # of the ledger we care about (the entries corresponding to specific entities)
    squared_Ls = L**2
    squared_sigma = sigma**2
    squared_L2_norms_min = l2_norm_min**2
    squared_L2_norms_max = l2_norm_max**2
    
    constant_min = (
        squared_Ls * squared_L2_norms_min / (2 * squared_sigma)
    )
    constant_max = (
        squared_Ls * squared_L2_norms_max / (2 * squared_sigma)
    )
    
#     constant = np.bincount(batch_entity_ids, weights=constant).take(
#         entity_ids_query
#     )
    # # update our serialized format with the calculated constants
    # self._rdp_constants = np.concatenate([self._rdp_constants, constant])
    # self._entity_ids_query = np.concatenate(
    #     [self._entity_ids_query, entity_ids_query]
    # )
    return constant_min, constant_max

In [None]:
#_get_batch_rdp_constants(sigma=100, scale=1000, min_val=700_000, max_val=20e6)

In [None]:
ledger_store = DictLedgerStore()
print(ledger_store.kv_store)
user_key = b"1231"
ledger = DataSubjectLedger.get_or_create(store=ledger_store, user_key=user_key)

In [None]:
res = tweets_data.sum()

In [None]:
res =res.child

In [None]:
res.max_val