In [None]:
# install poetry package manager

!conda install -c conda-forge poetry

In [None]:
# install pydp to the local conda environment

!cd ../PyDP && poetry install && cd ../pyspark_dp_beta

# General Spark Notes

Two things to consider:
- memory
- number of cores

Sizeable cluster — wmfdata gives templates of clusters you can define
what happens on spark submit is:
1. project gets bundled and shipped to hadoop
2. on top of hadoop a spark cluster is created
3. data gets split up and shipped to workers
3. workers do computation and send results back to leader

Divide total slice of input data by number of machines you want to have
Don't want data to spill to disc — will cause a performance hit, but won't hurt the cluster

wmfdata templates:
- `yarn-large`: up to 128 workers with 8 gb of RAM and 4 threads (e.g. 4 JVMs/tasks of work) each (2gb of RAM / core) — maximum number of workers is 128 * 4 = 512 threads , each one can handle up to 2 gb of data (actually less because there's some overhead). Theoretically if there was no overhead, the `yarn-large` cluster could handle up to 1 tb of data. Spark optimizer also does data partitioning to process underlying data in chunks so it doesn't need to hold all of the data at the same time.
- `yarn-regular`: half the size of `yarn-large`

dynamic allocation: Guaranteed an upper bound of resources, but if there are higher priority jobs workers can be reallocated, which causes weird behavior. Usually at the beginning of the month there are lots of high priority jobs.

Can get larger spark clusters, but need to ask engineering first — "we're doing an experiment, can we use more?"

Can then define a larger cluster in wmfdata

People to ask: Andrew Otto, Luca Toscano, Joseph Allemandou

Another element is how job is written, etc:

When spark ships job to executor, the execution planner tries to conduct computations on the same machine that contains the data. "Shuffling" is when a computation on worker C relies on data from workers A & B, and data has to be transmitted (slowly) across the network. Generally not a good practice.



In [2]:
import wmfdata
from dataclasses import dataclass
import numpy as np
import random
import math
from IPython.display import Latex

In [4]:
spark = wmfdata.spark.get_session(
    app_name='pyspark-large — differential privacy pydp — htriedman',
    type='yarn-large',
    ship_python_env=True
)

Collecting packages...
Packing environment at '/home/htriedman/.conda/envs/2021-06-30T22.51.28_htriedman' to 'conda-2021-06-30T22.51.28_htriedman.tgz'
[########################################] | 100% Completed |  1min 15.5s


Will ship conda-2021-06-30T22.51.28_htriedman.tgz to remote Spark executors.
PySpark executors will use conda-2021-06-30T22.51.28_htriedman/bin/python3.


In [6]:
# get (page title, page id, project, country, actor signature) for Aug 15 2021 UTC6:00

# TODO: change this to work with spark SQL, rather than RDD
# TODO: python functions --> UDFs

rdd = spark.sql("""
SELECT
  pageview_info['page_title'] as page_title,
  page_id,
  pageview_info['project'] as project,
  geocoded_data['country'] as country,
  actor_signature
FROM wmf.pageview_actor
WHERE
    year = 2021 AND month = 8 AND day = 15 -- AND hour = 6
    AND page_id IS NOT NULL
""").rdd

In [7]:
# add laplace noise to a single number
def add_laplace_noise(x, eps, sensitivity):
    import pydp
    return x + pydp.distributions.LaplaceDistribution(eps, sensitivity).sample()

# add laplace noise to a spark rdd
def add_laplace_noise_to_rdd(rdd, eps, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    return rdd.map(lambda x: (x[0], add_laplace_noise(x[1], eps_per_partition, sensitivity_per_partition)))

In [8]:
# add gaussian noise to a single number
def add_gaussian_noise(x, eps, delta, sensitivity):
    import pydp
    sigma_squared = (2 * math.log(1.25 / delta) * sensitivity**2) / (eps**2)
    return x + pydp.distributions.GaussianDistribution(sigma_squared).sample()

# add laplace noise to a spark rdd
def add_gaussian_noise_to_rdd(rdd, eps, delta, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    return rdd.map(lambda x: (x[0], add_gaussian_noise(x[1], eps_per_partition, delta, sensitivity_per_partition)))

In [9]:
def calculate_threshold(eps, delta, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    b = sensitivity_per_partition / eps_per_partition
    return -b * math.log(2 * b * delta)

In [16]:
# TODO: reduceByKey is generally an expensive operation, lots of data points spread around the cluster, need to combine them all
# TODO: optimization would largely come from using spark SQL rather than RDDs

# do bounded DP count
def do_count(rdd, eps, delta, max_partitions, max_per_partition, noise_kind):
    # rekey to a tuple of (actor signature, page id)
    # ((actor_signature, page_id), pageview)
    dp_count_rdd = rdd.map(lambda x: ((x.actor_signature, x.page_id), [x]))

    # randomly get a set of at most `max_per_partition` pageviews for each (actor signature, page id) tuple
    # ((actor_signature, page_id), [pageview]) {max length of max_per_partition}
    dp_count_rdd = dp_count_rdd.reduceByKey(lambda x, y: random.sample(x + y, min(len(x) + len(y), max_per_partition)))

    # rekey to just actor signature
    # (actor_signature, [pageview]) {with redundancies}
    dp_count_rdd = dp_count_rdd.map(lambda x: ((x[0][0], x[1])))

    # randomly get a set of at most `max_partitions` sets of pageviews for each actor signature
    # (actor_signature, [pageview]) {max length of max_per_partition * max_partitions}
    dp_count_rdd = dp_count_rdd.reduceByKey(lambda x, y: random.sample(x + y, min(len(x) + len(y), max_partitions)))

    # drop actor signature as key
    # ([pageview])
    dp_count_rdd = dp_count_rdd.map(lambda x: x[1])

    # unnest lists of pageviews using a flatmap
    # (pageview)
    dp_count_rdd = dp_count_rdd.flatMap(lambda x: x)

    # now that contributions are bounded, count views per tuple
    dp_count_rdd = dp_count_rdd.map(lambda x: ((x.project, x.country, x.page_id, x.page_title), 1))
    dp_count_rdd = dp_count_rdd.reduceByKey(lambda x, y: (x + y))

    if noise_kind == "laplace":
        # add laplace noise to counts
        dp_count_rdd = add_laplace_noise_to_rdd(dp_count_rdd, eps, max_partitions, max_per_partition)
    elif noise_kind == "gaussian":
        dp_count_rdd = add_gaussian_noise_to_rdd(dp_count_rdd, eps, delta, max_partitions, max_per_partition)

    # filter tuples that have less than `min_number_of_views` views
    dp_count_rdd = dp_count_rdd.filter(lambda x: x[1] >= calculate_threshold(delta, eps, max_partitions, max_per_partition))

    # round view count to integers for readability
    dp_count_rdd = dp_count_rdd.map(lambda x: (x[0], round(x[1], 0)))

    return dp_count_rdd.takeOrdered(200, key=lambda x: -x[1])

In [19]:
# total contributions (aka sensitivity) = max_per_partition * max_partitions
max_partitions = 5    # say that users can visit at most 5 pages
max_per_partition = 2 # and for each page they can contribute at most 2 pageviews

eps = 1
delta = 1 / (7e8 * math.sqrt(7e8))

In [17]:
do_count(rdd, eps, delta, max_partitions, max_per_partition, "laplace")

doing first rekey
doing first reduceByKey
doing second rekey
doing second reduceByKey
doing third rekey
flat map
doing thrid reduceByKey (actual count)
adding noise
filtering to threshold
rounding
taking top 200


[(('en.wikipedia', 'United States', 15580374, 'Main_Page'), 1568856.0),
 (('de.wikipedia', 'Germany', 5248757, 'Wikipedia:Hauptseite'), 540537.0),
 (('en.wikipedia', 'United Kingdom', 15580374, 'Main_Page'), 455495.0),
 (('ja.wikipedia', 'Japan', 253348, 'メインページ'), 334953.0),
 (('en.wikipedia', 'India', 15580374, 'Main_Page'), 272602.0),
 (('fr.wikipedia', 'France', 10635368, 'Wikipédia:Accueil_principal'),
  252129.0),
 (('de.wikipedia', 'Germany', 72746, 'Gerd_Müller'), 221146.0),
 (('en.wikipedia', 'India', 3349824, 'Vikram_Batra'), 216939.0),
 (('en.wikipedia', 'India', 30635, 'Taliban'), 214530.0),
 (('en.wikipedia', 'United States', 30635, 'Taliban'), 207496.0),
 (('de.wikipedia', 'Germany', 5070, 'Taliban'), 174477.0),
 (('en.wikipedia', 'Canada', 15580374, 'Main_Page'), 150612.0),
 (('fa.wikipedia', 'Iran', 27859, 'طالبان'), 148399.0),
 (('ru.wikipedia', 'Russia', 71896, 'Талибан'), 143396.0),
 (('ja.wikipedia', 'Japan', 3093109, 'ジャッキー・ウー'), 140920.0),
 (('it.wikipedia', 'Ital

In [20]:
print(calculate_threshold(eps, delta, max_partitions, max_per_partition))

275.5415406595753


In [None]:
do_count(rdd, eps, delta, max_partitions, max_per_partition, "gaussian")

# Spark SQL implementation

In [28]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType, IntegerType

In [29]:
rdd = spark.sql("""
SELECT
  pageview_info['page_title'] as page_title,
  page_id,
  pageview_info['project'] as project,
  geocoded_data['country'] as country,
  actor_signature
FROM wmf.pageview_actor
WHERE
    year = 2021 AND month = 8 AND day = 15 -- AND hour = 6
    AND page_id IS NOT NULL
""").rdd

In [37]:
# add laplace noise to a single number
@udf(returnType=FloatType())
def add_laplace_noise(x, eps, sensitivity):
    import pydp
    return x + pydp.distributions.LaplaceDistribution(eps, sensitivity).sample()

# add laplace noise to a spark df
def add_laplace_noise_to_df(df, eps, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    
    return df.withColumn("Laplacian_Noisy_Count", add_laplace_noise("count", eps_per_partition, sensitivity_per_partition))

In [38]:
# add gaussian noise to a single number
@udf(returnType=FloatType())
def add_gaussian_noise(x, eps, delta, sensitivity):
    import pydp
    sigma_squared = (2 * math.log(1.25 / delta) * sensitivity**2) / (eps**2)
    return x + pydp.distributions.GaussianDistribution(sigma_squared).sample()

# add laplace noise to a spark df
def add_gaussian_noise_to_df(df, eps, delta, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    
    return df.withColumn("Gaussian_Noisy_Count", add_gaussian_noise("count", eps_per_partition, sensitivity_per_partition))

In [32]:
def calculate_threshold(eps, delta, max_partitions, max_per_partition):
    eps_per_partition = eps / max_partitions
    sensitivity_per_partition = max_per_partition
    b = sensitivity_per_partition / eps_per_partition
    return -b * math.log(2 * b * delta)

In [33]:
# TODO: reduceByKey is generally an expensive operation, lots of data points spread around the cluster, need to combine them all
# TODO: optimization would largely come from using spark SQL rather than RDDs

# do bounded DP count
def do_count(rdd, eps, delta, max_partitions, max_per_partition, noise_kind):
    # rekey to a tuple of (actor signature, page id)
    # ((actor_signature, page_id), pageview)
    dp_count_rdd = rdd.map(lambda x: ((x.actor_signature, x.page_id), [x]))

    # randomly get a set of at most `max_per_partition` pageviews for each (actor signature, page id) tuple
    # ((actor_signature, page_id), [pageview]) {max length of max_per_partition}
    dp_count_rdd = dp_count_rdd.reduceByKey(lambda x, y: random.sample(x + y, min(len(x) + len(y), max_per_partition)))

    # rekey to just actor signature
    # (actor_signature, [pageview]) {with redundancies}
    dp_count_rdd = dp_count_rdd.map(lambda x: ((x[0][0], x[1])))

    # randomly get a set of at most `max_partitions` sets of pageviews for each actor signature
    # (actor_signature, [pageview]) {max length of max_per_partition * max_partitions}
    dp_count_rdd = dp_count_rdd.reduceByKey(lambda x, y: random.sample(x + y, min(len(x) + len(y), max_partitions)))

    # drop actor signature as key
    # ([pageview])
    dp_count_rdd = dp_count_rdd.map(lambda x: x[1])

    # unnest lists of pageviews using a flatmap
    # (pageview)
    dp_count_rdd = dp_count_rdd.flatMap(lambda x: x)
    
    df = dp_count_rdd.toDF()

    # now that contributions are bounded, count views per tuple
    df = df.groupBy(['project', 'country', 'page_id', 'page_title']).count()
    
    thresh = calculate_threshold(delta, eps, max_partitions, max_per_partition)

    # add laplace noise to counts, filter to threshold, round, and return the df
    if noise_kind == "laplace":
        df = add_laplace_noise_to_df(df, eps, max_partitions, max_per_partition)
        df = df.filter(df.Laplacian_Noisy_Count > thresh)
        df = df.foreach(lambda x: round(x.Laplacian_Noisy_Count, 0))
        return df.sort(df.Laplacian_Noisy_Count.desc()).take(500)
    elif noise_kind == "gaussian":
        df = add_gaussian_noise_to_df(df, eps, delta, max_partitions, max_per_partition)
        df = df.filter(df.Gaussian_Noisy_Count > thresh)
        df = df.foreach(lambda x: round(x.Gaussian_Noisy_Count, 0))
        return df.sort(df.Gaussian_Noisy_Count.desc()).take(500)

In [34]:
# total contributions (aka sensitivity) = max_per_partition * max_partitions
max_partitions = 5    # say that users can visit at most 5 pages
max_per_partition = 2 # and for each page they can contribute at most 2 pageviews

eps = 1
delta = 1 / (7e8 * math.sqrt(7e8))

In [35]:
print(calculate_threshold(eps, delta, max_partitions, max_per_partition))

275.5415406595753


In [39]:
do_count(rdd, eps, delta, max_partitions, max_per_partition, "laplace")

TypeError: Invalid argument, not a string or column: 0.2 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

In [None]:
do_count(rdd, eps, delta, max_partitions, max_per_partition, "gaussian")

In [26]:
from pyspark.sql.functions import col
import pandas as pd

dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
sampled.groupBy("key").count().orderBy("key").show()


NameError: name 'sqlContext' is not defined