### custom virtual env setup

This notebook sidesteps jupyterhub, and manually instantiates a Spark Session with a custom virtualenv distributed to workers. The virtual env has been created as follows:

```
$ cd $HOME/pyspark_dp_beta
$ pythom -m venv venv
$ source venv/bin/activate
$ pip install findspark jupyter python-dp numpy
$ tar cvfz venv.tar.gz venv
```
Start a jupyter server from the virtual enviroment with

```
$ jupyter --no-browser
```

Warning: this approach is meant as a test, and deviates from Newpyter guidelines https://wikitech.wikimedia.org/wiki/Analytics/Systems/Jupyter-SWAP

In [None]:
import os

# Assumes $HOME/pyspark_dp_beta/venv.tar.gz exists
venv = os.path.join(os.environ['HOME'], 'pyspark_dp_beta/venv.tar.gz#venv')

In [None]:
import findspark
from dataclasses import dataclass
import numpy as np
import random
import math
from IPython.display import Latex

In [None]:
SPARK_HOME = os.environ.get("SPARK_HOME", "/usr/lib/spark2")
findspark.init(SPARK_HOME)
from pyspark.sql import SparkSession
app_name = 'pyspark-medium — differential privacy pydp — htriedman'

os.environ['PYSPARK_PYTHON'] = './venv/venv/bin/python'
spark = (
        SparkSession.builder
        .master('yarn-client')
        .config('spark.driver.maxResultSize', '2048M')
        .config('spark.dynamicAllocation.maxExecutors', '64')
        .config('spark.executor.memory', '8g')
        .config('spark.executor.cores', 4)
        .config('spark.sql.shuffle.partitions', 256)
        .config('spark.yarn.dist.archives', venv)
        .getOrCreate()
)

In [None]:
# get (page title, page id, project, country, actor signature) for Aug 15 2021 UTC6:00

rdd = spark.sql("""
SELECT
  pageview_info['page_title'] as page_title,
  page_id,
  pageview_info['project'] as project,
  geocoded_data['country'] as country,
  actor_signature
FROM wmf.pageview_actor
WHERE year = 2021 AND month = 8 AND day = 15 AND hour = 6 AND page_id IS NOT NULL
""").rdd

In [None]:
# add laplace noise to a single number
def add_laplace_noise(x, eps, sensitivity):
    import pydp
    return x + pydp.distributions.LaplaceDistribution(eps, sensitivity).sample()

# add laplace noise to a spark rdd
def add_laplace_noise_to_rdd(rdd, eps, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    return rdd.map(lambda x: (x[0], add_laplace_noise(x[1], eps_per_partition, sensitivity_per_partition)))

In [None]:
# add gaussian noise to a single number
def add_gaussian_noise(x, eps, delta, sensitivity):
    import pydp
    sigma_squared = (2 * math.log(1.25 / delta) * sensitivity**2) / (eps**2)
    return x + pydp.distributions.GaussianDistribution(sigma_squared).sample()

# add laplace noise to a spark rdd
def add_gaussian_noise_to_rdd(rdd, eps, delta, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    return rdd.map(lambda x: (x[0], add_gaussian_noise(x[1], eps_per_partition, delta, sensitivity_per_partition)))

In [None]:
def calculate_threshold(eps, delta, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    b = sensitivity_per_partition / eps_per_partition
    return -b * math.log(2 * b * delta)

In [None]:
# do bounded DP count
def do_count(rdd, eps, delta, max_partitions, max_per_partition, noise_kind):
    # rekey to a tuple of (actor signature, page id)
    # ((actor_signature, page_id), pageview)
    dp_count_rdd = rdd.map(lambda x: ((x.actor_signature, x.page_id), [x]))

    # randomly get a set of at most `max_per_partition` pageviews for each (actor signature, page id) tuple
    # ((actor_signature, page_id), [pageview]) {max length of max_per_partition}
    dp_count_rdd = dp_count_rdd.reduceByKey(lambda x, y: random.sample(x + y, min(len(x) + len(y), max_per_partition)))

    # rekey to just actor signature
    # (actor_signature, [pageview]) {with redundancies}
    dp_count_rdd = dp_count_rdd.map(lambda x: ((x[0][0], x[1])))

    # randomly get a set of at most `max_partitions` sets of pageviews for each actor signature
    # (actor_signature, [pageview]) {max length of max_per_partition * max_partitions}
    dp_count_rdd = dp_count_rdd.reduceByKey(lambda x, y: random.sample(x + y, min(len(x) + len(y), max_partitions)))

    # drop actor signature as key
    # ([pageview])
    dp_count_rdd = dp_count_rdd.map(lambda x: x[1])

    # unnest lists of pageviews using a flatmap
    # (pageview)
    dp_count_rdd = dp_count_rdd.flatMap(lambda x: x)

    # now that contributions are bounded, count views per tuple
    dp_count_rdd = dp_count_rdd.map(lambda x: ((x.project, x.country, x.page_id, x.page_title), 1))
    dp_count_rdd = dp_count_rdd.reduceByKey(lambda x, y: (x + y))

    if noise_kind == "laplace":
        # add laplace noise to counts
        dp_count_rdd = add_laplace_noise_to_rdd(dp_count_rdd, eps, max_partitions, max_per_partition)
    elif noise_kind == "gaussian":
        dp_count_rdd = add_gaussian_noise_to_rdd(dp_count_rdd, eps, delta, max_partitions, max_per_partition)

    # filter tuples that have less than `min_number_of_views` views
    dp_count_rdd = dp_count_rdd.filter(lambda x: x[1] >= calculate_threshold(delta, eps, max_partitions, max_per_partition))

    # round view count to integers for readability
    dp_count_rdd = dp_count_rdd.map(lambda x: (x[0], round(x[1], 0)))

    dp_count_rdd.takeOrdered(200, key=lambda x: -x[1])

In [None]:
# total contributions (aka sensitivity) = max_per_partition * max_partitions
max_partitions = 5    # say that users can visit at most 5 pages
max_per_partition = 2 # and for each page they can contribute at most 2 pageviews

eps = 1
delta = 5e-8

In [None]:
do_count(rdd, eps, delta, max_partitions, max_per_partition, "laplace")

In [None]:
do_count(rdd, eps, delta, max_partitions, max_per_partition, "gaussian")

In [None]:
import sys
sys.path

In [None]:
# ^^note that in the above output, `/srv/home/htriedman/PyDP/src` is included along with a bunch of
# conda-related paths. The directory containing PyDP isn't getting sent to worker nodes with the rest
# of the environment.