In [65]:
!pip3 install tmlt-analytics

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [99]:
import os
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from tmlt.analytics.privacy_budget import PureDPBudget
from tmlt.analytics.protected_change import AddOneRow
from tmlt.analytics.query_builder import QueryBuilder
from tmlt.analytics.session import Session

In [105]:

spark = SparkSession.builder.getOrCreate()
spark.stop()
spark = SparkSession.builder.getOrCreate()

In [106]:
spark.sparkContext.addFile(
"/downloads/transactions.csv")
trans_df = spark.read.csv(
    SparkFiles.get("/downloads/transactions.csv"), header=True, inferSchema=True
)


In [107]:
trans_df.head(5)

[Row(_c0=0, TRANSACTION_ID=0, TX_DATETIME=datetime.datetime(2023, 2, 1, 0, 43, 37), CUSTOMER_ID=901, TERMINAL_ID=8047, TX_AMOUNT=82, TX_TIME_SECONDS=2617, TX_TIME_DAYS=0),
 Row(_c0=1, TRANSACTION_ID=1, TX_DATETIME=datetime.datetime(2023, 2, 1, 1, 20, 13), CUSTOMER_ID=2611, TERMINAL_ID=7777, TX_AMOUNT=15, TX_TIME_SECONDS=4813, TX_TIME_DAYS=0),
 Row(_c0=2, TRANSACTION_ID=2, TX_DATETIME=datetime.datetime(2023, 2, 1, 1, 22, 52), CUSTOMER_ID=4212, TERMINAL_ID=3336, TX_AMOUNT=53, TX_TIME_SECONDS=4972, TX_TIME_DAYS=0),
 Row(_c0=3, TRANSACTION_ID=3, TX_DATETIME=datetime.datetime(2023, 2, 1, 1, 26, 40), CUSTOMER_ID=1293, TERMINAL_ID=7432, TX_AMOUNT=59, TX_TIME_SECONDS=5200, TX_TIME_DAYS=0),
 Row(_c0=4, TRANSACTION_ID=4, TX_DATETIME=datetime.datetime(2023, 2, 1, 1, 52, 23), CUSTOMER_ID=2499, TERMINAL_ID=1024, TX_AMOUNT=25, TX_TIME_SECONDS=6743, TX_TIME_DAYS=0)]

In [108]:
session = Session.from_dataframe(
    privacy_budget=PureDPBudget(3.5),
    source_id="transactions",
    dataframe=trans_df,
    protected_change=AddOneRow(),
)

In [109]:
count_query = QueryBuilder("transactions").count()

In [110]:
total_count = session.evaluate(
    count_query,
    privacy_budget=PureDPBudget(1)
)
total_count.show()



+-------+
|  count|
+-------+
|4557168|
+-------+



In [111]:
total_count = trans_df.count()
print(total_count)

4557166


In [112]:
session.describe()

The session has a remaining privacy budget of PureDPBudget(epsilon=2.5).
The following private tables are available:
Table 'transactions' (no constraints):
	Columns:
		- '_c0'              INTEGER
		- 'TRANSACTION_ID'   INTEGER
		- 'TX_DATETIME'      TIMESTAMP
		- 'CUSTOMER_ID'      INTEGER
		- 'TERMINAL_ID'      INTEGER
		- 'TX_AMOUNT'        INTEGER
		- 'TX_TIME_SECONDS'  INTEGER
		- 'TX_TIME_DAYS'     INTEGER


In [113]:
low_purchagers= QueryBuilder("transactions").filter("TX_AMOUNT < 25").count()
low_purchagers_count = session.evaluate(
    low_purchagers,
    privacy_budget=PureDPBudget(epsilon=1),
)
low_purchagers_count.show()

+-------+
|  count|
+-------+
|1024844|
+-------+



In [114]:
print(session.remaining_privacy_budget)

PureDPBudget(epsilon=1.5)


In [115]:
med_purchagers= QueryBuilder("transactions").filter("TX_AMOUNT >25 AND TX_AMOUNT <50").count()
med_purchagers_count = session.evaluate(
    med_purchagers,
    privacy_budget=PureDPBudget(epsilon=1),
)
med_purchagers_count.show()

+-------+
|  count|
+-------+
|1165384|
+-------+



In [116]:
print(session.remaining_privacy_budget)

PureDPBudget(epsilon=0.5)


In [117]:
high_purchagers= QueryBuilder("transactions").filter("TX_AMOUNT > 50").count()
high_purchagers_count = session.evaluate(
    high_purchagers,
    privacy_budget=PureDPBudget(epsilon=1),
)
high_purchagers_count.show()

RuntimeError: ignored

In [118]:
high_purchagers= QueryBuilder("transactions").filter("TX_AMOUNT > 50").count()
high_purchagers_count = session.evaluate(
    high_purchagers,
    privacy_budget=session.remaining_privacy_budget,
)
high_purchagers_count.show()



+-------+
|  count|
+-------+
|2271804|
+-------+



In [119]:
budget = PureDPBudget(epsilon=2.5) # maximum budget consumed in the Session
session = Session.from_dataframe(
    privacy_budget=budget,
    source_id="transactions",
    dataframe=trans_df,
    protected_change=AddOneRow(),
)


In [120]:
from tmlt.analytics.keyset import KeySet
terminal_ids = KeySet.from_dict({
    "TERMINAL_ID": [
        1,2,3,4,5,6,7,8,9,10
    ]
})

In [121]:
average_purchase_query = (
    QueryBuilder("transactions")
    .groupby(terminal_ids)
    .average("TX_AMOUNT", low=5, high=100)
)
average_purchages= session.evaluate(
    average_purchase_query,
    privacy_budget=PureDPBudget(1),
)
average_purchages.show(truncate=False)

+-----------+------------------+
|TERMINAL_ID|TX_AMOUNT_average |
+-----------+------------------+
|1          |55.93609022556391 |
|2          |52.93446601941748 |
|3          |40.95974576271186 |
|4          |52.02414486921529 |
|5          |47.511428571428574|
|6          |52.276595744680854|
|7          |51.566233766233765|
|8          |50.12273641851107 |
|9          |52.88358208955224 |
|10         |48.98945147679325 |
+-----------+------------------+



In [122]:
from tmlt.analytics.protected_change import AddRowsWithID
budget = PureDPBudget(epsilon=2.5) # maximum budget consumed in the Session
session = Session.from_dataframe(
    privacy_budget=budget,
    source_id="transactions",
    dataframe=trans_df,
    protected_change=AddRowsWithID(id_column="CUSTOMER_ID"),
)

In [123]:
keyset = KeySet.from_dataframe(
    trans_df.select("TERMINAL_ID", "TX_AMOUNT")
)
count_query = (
    QueryBuilder("transactions")
    .groupby(keyset)
    .count()
)
result = session.evaluate(count_query, PureDPBudget(1))

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


RuntimeError: ignored

In [124]:
from tmlt.analytics.constraints import (
    MaxGroupsPerID,
    MaxRowsPerGroupPerID,
    MaxRowsPerID,
)
keyset = KeySet.from_dataframe(
    trans_df.select("TERMINAL_ID", "TX_AMOUNT")
)
count_query = (
    QueryBuilder("transactions")
    .enforce(MaxRowsPerID(100))
    .groupby(keyset)
    .count()
)
result = session.evaluate(count_query, PureDPBudget(1))

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [125]:
top_five = result.sort("count", ascending=False).limit(5)
top_five.show()

+-----------+---------+-----+
|TERMINAL_ID|TX_AMOUNT|count|
+-----------+---------+-----+
|       3001|       98| 1240|
|       3536|       42| 1217|
|       4359|       71| 1212|
|       9137|       97| 1145|
|       7179|       76| 1143|
+-----------+---------+-----+



In [126]:
result.show()

+-----------+---------+-----+
|TERMINAL_ID|TX_AMOUNT|count|
+-----------+---------+-----+
|          0|        4|  401|
|          0|        7|  224|
|          0|       11|   -7|
|          0|       12|  131|
|          0|       16|  -35|
|          0|       18|  -68|
|          0|       20| -126|
|          0|       24|  -46|
|          0|       26| -162|
|          0|       28|  -30|
|          0|       31|  447|
|          0|       33|   23|
|          0|       35|   20|
|          0|       44|   96|
|          0|       49|  -56|
|          0|       51|  211|
|          0|       58|  -88|
|          0|       59|  -27|
|          0|       60| -254|
|          0|       61|  525|
+-----------+---------+-----+
only showing top 20 rows

