In [1]:
#### Importing needed libraries
import pandas as pd
import os
import opendp.smartnoise.core as sn
import numpy as np
import math
from datetime import datetime
import time

In [2]:
#### Reading the df
df = pd.read_csv('./ny_dummy.csv')

In [3]:
df.head()

Unnamed: 0,transitions,activity_day,uuid,from_fips,to_fips
0,32,01/04/21,c67962c2-21ba-42d1-9bcd-51d873681d04,36015,36107
1,61,01/04/21,fde17b46-ba00-46ad-b752-e30f907a2e38,36019,36031
2,476,01/04/21,3442ce64-1286-4631-94c5-b8b99814f16c,36097,36097
3,33,01/04/21,2dac2670-4823-4356-8ff2-de390b6d5f7e,36057,36093
4,126,01/04/21,e6708dab-261d-4c28-a8cf-d2fec9f68387,36083,36091


#### Documentation for SmartNoise SQL modules - https://github.com/opendp/smartnoise-samples/tree/master/data

#### Documentation for metadata specification - https://github.com/opendp/smartnoise-sdk/blob/main/sdk/Metadata.md 

#### Generating unique IDs -- PandasReader throws an error without specifying a primary key

In [4]:
#### Getting min-max for YAML specs

df['transitions'].describe()

count     4766.000000
mean      1973.857113
std       7083.665042
min          0.000000
25%         40.000000
50%        103.000000
75%        757.500000
max      63865.000000
Name: transitions, dtype: float64

PandasReader

In [11]:
#### Applying DP on aggregations 
from opendp.smartnoise.sql import PandasReader, PrivateReader
from opendp.smartnoise.metadata import CollectionMetadata

#Collecting metadata info from YAML
meta = CollectionMetadata.from_file('./transitions.yaml')

#Query to be privatized
query = "SELECT from_fips, to_fips, activity_day, SUM(transitions) AS avg_transitions from mobility.activity GROUP BY from_fips, to_fips, activity_day ORDER BY from_fips, to_fips, activity_day"

#Reading the DF
reader = PandasReader(df, meta)

#Passing reader to private reader
private_reader = PrivateReader(reader, meta, 4)

#Executing the query
result_dp = private_reader.execute_df(query)

#Getting results
print(result_dp)

Empty DataFrame
Columns: [from_fips, to_fips, activity_day, avg_transitions]
Index: []


Returns empty df with current metadata specs. Adding the following specs to metadata :       
    <b>max_ids: 500
    
   sample_max_ids: True </b>
   
   
renders the following output : 

In [8]:
#### Applying DP on aggregations 
from opendp.smartnoise.sql import PandasReader, PrivateReader
from opendp.smartnoise.metadata import CollectionMetadata

#Collecting metadata info from YAML
meta = CollectionMetadata.from_file('./transitions.yaml')

#Query to be privatized
query = "SELECT from_fips, to_fips, activity_day, AVG(transitions) AS avg_transitions from mobility.activity GROUP BY from_fips, to_fips, activity_day ORDER BY from_fips, to_fips, activity_day"

#Reading the DF
reader = PandasReader(df, meta)

#Passing reader to private reader
private_reader = PrivateReader(reader, meta, 0.1)

#Executing the query
result_dp = private_reader.execute_df(query)

#Getting results
print(result_dp)

    from_fips to_fips activity_day  avg_transitions
0       36001   36039     02/04/21    -14833.532351
1       36001   36039     03/04/21     82675.953206
2       36001   36065     01/04/21    -20903.720390
3       36001   36065     02/04/21     10467.353729
4       36001   36083     01/04/21     55719.587531
..        ...     ...          ...              ...
438     36123   36101     02/04/21    -37939.250894
439     36123   36101     03/04/21     12228.624132
440     36123   36123     01/04/21     -6814.713375
441     36123   36123     02/04/21      -151.449129
442     36123   36123     03/04/21    -50486.429582

[443 rows x 4 columns]


In [14]:
#Original Values
gdf = df.groupby(['from_fips','to_fips','activity_day'])['transitions'].mean().reset_index()
gdf = gdf.dropna()
gdf.head(10)

Unnamed: 0,from_fips,to_fips,activity_day,transitions
0,36001,36001,2021-04-01,7855.333333
1,36001,36001,2021-04-02,7568.5
2,36001,36001,2021-04-03,7088.833333
140,36001,36021,2021-04-01,29.0
141,36001,36021,2021-04-02,36.5
142,36001,36021,2021-04-03,32.5
239,36001,36035,2021-04-02,20.0
240,36001,36035,2021-04-03,17.0
266,36001,36039,2021-04-01,63.4
267,36001,36039,2021-04-02,70.0


Spark Implementation

In [21]:
import findspark

In [22]:
findspark.init()

In [23]:
#### Importing spark libraries
import pyspark
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

In [25]:
#### Reading df with spark

from pyspark.sql.types import FloatType, BooleanType

filepath = "./ny_dummy.csv"

transition = spark.read.load(filepath, format="csv", sep=",",inferSchema="true", header="true")

transition = transition.withColumn("from_fips", col("from_fips").cast(StringType()))
transition = transition.withColumn("to_fips", col("to_fips").cast(StringType()))
transition = transition.withColumn("uuid", col("uuid").cast(StringType()))
transition = transition.withColumn("transitions", col("transitions").cast(FloatType()))
transition = transition.withColumn("activity_day", col("activity_day").cast(StringType()))
transition.show(5)
print("There are {0} individuals in the data".format(transition.count()))

+-----------+------------+--------------------+---------+-------+
|transitions|activity_day|                uuid|from_fips|to_fips|
+-----------+------------+--------------------+---------+-------+
|       32.0|    01/04/21|c67962c2-21ba-42d...|    36015|  36107|
|       61.0|    01/04/21|fde17b46-ba00-46a...|    36019|  36031|
|      476.0|    01/04/21|3442ce64-1286-463...|    36097|  36097|
|       33.0|    01/04/21|2dac2670-4823-435...|    36057|  36093|
|      126.0|    01/04/21|e6708dab-261d-4c2...|    36083|  36091|
+-----------+------------+--------------------+---------+-------+
only showing top 5 rows

There are 4766 individuals in the data


In [26]:
#### Running exact queries

from opendp.smartnoise.sql import SparkReader

transition.createOrReplaceTempView("transitions")

reader = SparkReader(spark)
query = 'SELECT COUNT(*) FROM transitions'


res = reader.execute_typed(query)
res.show()

+--------+
|count(1)|
+--------+
|    4766|
+--------+



In [31]:
#### Testing exact queries

query = 'SELECT from_fips,to_fips, AVG(transitions) AS transitions, activity_day FROM transitions GROUP BY from_fips,to_fips, activity_day'
synopsis = reader.execute(query)
synopsis.show(5)
print("{0} distinct dimensions".format(synopsis.count()))

+---------+-------+------------------+------------+
|from_fips|to_fips|       transitions|activity_day|
+---------+-------+------------------+------------+
|    36121|  36029|              47.8|    01/04/21|
|    36083|  36115|              13.0|    01/04/21|
|    36031|  36031|             840.5|    02/04/21|
|    36051|  36003|              17.0|    02/04/21|
|    36011|  36011|1919.8333333333333|    03/04/21|
+---------+-------+------------------+------------+
only showing top 5 rows

1175 distinct dimensions


In [29]:
#### Max of column needed for YAML specs

query = 'SELECT MAX(transitions) FROM transitions'
res = reader.execute_typed(query)
res.show()

+----------------+
|max(transitions)|
+----------------+
|         63865.0|
+----------------+



In [32]:
#### Running the DP query

from opendp.smartnoise.sql import PrivateReader
from opendp.smartnoise.metadata import CollectionMetadata

meta = CollectionMetadata.from_file('./mobility_ny.yaml')

epsilon = 4.0

private = PrivateReader(reader, meta, 0.1)

private.reader.compare.search_path = ["mobility"]
#private.options.row_privacy = True
#private.options.clamp_counts = False
#private.options.censor_dims = False

private_synopsis = private.execute(query) # using same query from before

private_synopsis.show(5)
print("{0} distinct dimensions".format(private_synopsis.count()))

IndexError: list index out of range