## Data Security & Privacy in Workflows: Spark Solutions

In this notebook, you'll find solutions as to one way to solve the translation between Python processing and Spark. Remember: there are usually multiple ways to solve a problem in Spark! So long as yours worked, please take these as simply suggestions!

### Our Goal: Build a Privacy-First Sensor Map

- We want to ingest air quality sensor data from users, buildings and institutions who are willing to send us data to build an air quality map (similar to the [IQAir map](https://www.iqair.com/air-quality-map).
- Users only want to share the data if they can remain anonymous and their location is fuzzy, so that they are protected against stalkers, prying eyes and state surveillance.
- Since the data is sensitive (from people and their homes!), we want to sure that it is secured either at collection, as well as at any intermediary hops.

Let's first take a look at our data and determine what can and should be done...

In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import when, split, udf, round as pys_round,\
    regexp_replace, rand, to_timestamp, expr, unix_timestamp



In [2]:
spark = SparkSession.builder.getOrCreate()

df = spark.read.csv("data/air_quality.csv", header=True, inferSchema=True)

22/05/24 11:12:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
df.head()

Row(air_quality_index=200, user_id='maynardmorgan', location="('10.66668', '-61.51889', 'Port of Spain', 'TT', 'America/Port_of_Spain')", top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188')

In [4]:
df.select('location').show(1, truncate=False)

+-------------------------------------------------------------------------+
|location                                                                 |
+-------------------------------------------------------------------------+
|('10.66668', '-61.51889', 'Port of Spain', 'TT', 'America/Port_of_Spain')|
+-------------------------------------------------------------------------+
only showing top 1 row



In [5]:
df.printSchema()

root
 |-- air_quality_index: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- top_pollutant: string (nullable = true)
 |-- top_pollutant_concentration: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [6]:
df = df.withColumn("location", regexp_replace("location", "[()']", ""))

In [7]:
df.select('location').show(1, truncate=False)

+-------------------------------------------------------------+
|location                                                     |
+-------------------------------------------------------------+
|10.66668, -61.51889, Port of Spain, TT, America/Port_of_Spain|
+-------------------------------------------------------------+
only showing top 1 row



In [8]:
df = df.withColumn("location_arr", split("location", ", "))

In [9]:
df.head()

Row(air_quality_index=200, user_id='maynardmorgan', location='10.66668, -61.51889, Port of Spain, TT, America/Port_of_Spain', top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188', location_arr=['10.66668', '-61.51889', 'Port of Spain', 'TT', 'America/Port_of_Spain'])

In [10]:
df = df.withColumn('lat', df.location_arr.getItem(0).cast('float'))
df = df.withColumn('long', df.location_arr.getItem(1).cast('float'))
df = df.withColumn('city', df.location_arr.getItem(2))
df = df.withColumn('country', df.location_arr.getItem(3))
df = df.withColumn('timezone', df.location_arr.getItem(4))

In [11]:
cleaned_df = df.drop('location', 'location_arr')

In [12]:
cleaned_df.head()

Row(air_quality_index=200, user_id='maynardmorgan', top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188', lat=10.666680335998535, long=-61.518890380859375, city='Port of Spain', country='TT', timezone='America/Port_of_Spain')

What else is missing for our map? It seems like on IQAir's map they have categories of pollutants. We likely want to do something similar to break our map down into colors and ranges. 

Based on the IQAir map, the ranges look to be about:

- Great: less than or equal to 50
- Good: 51-100
- Okay: 101-150
- Poor: 151-200
- Bad: 201-300
- Extremely Bad: 301+

Let's make these into integer values 1-6

In [13]:
cleaned_df = cleaned_df.withColumn('air_quality_category', when(
    cleaned_df.air_quality_index <= 50, 1).when(
    cleaned_df.air_quality_index <= 100, 2).when(
    cleaned_df.air_quality_index <= 150, 3).when(
    cleaned_df.air_quality_index <= 200, 4).when(
    cleaned_df.air_quality_index <= 300, 5).otherwise(6))

In [14]:
cleaned_df.head()

Row(air_quality_index=200, user_id='maynardmorgan', top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188', lat=10.666680335998535, long=-61.518890380859375, city='Port of Spain', country='TT', timezone='America/Port_of_Spain', air_quality_category=4)

#### What is sensitive here?

In [15]:
cleaned_df.sample(0.01).show(3, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------
 air_quality_index           | 300                        
 user_id                     | williamszachary            
 top_pollutant               | CO                         
 top_pollutant_concentration | 15                         
 timestamp                   | 2022-04-07T14:40:34.869188 
 lat                         | 42.06166                   
 long                        | -1.60452                   
 city                        | Tudela                     
 country                     | ES                         
 timezone                    | Europe/Madrid              
 air_quality_category        | 5                          
-RECORD 1-------------------------------------------------
 air_quality_index           | 280                        
 user_id                     | cathygreen                 
 top_pollutant               | CO                         
 top_pollutant_concentration | 81                       

#### How might we...?

- Protect user_id while still allowing it to be linkable?
- Remove potentially identifying precision in location?
- Remove potentially identifying information in the timestamp?
- Make these into scalable and repeatable actions for our workflow?

Let's work on these step by step!

In [17]:
from ff3 import FF3Cipher
key = "2DE79D232DF5585D68CE47882AE256D6"
tweak = "CBD09280979564"

c6 = FF3Cipher.withCustomAlphabet(key, tweak, "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_")

plaintext = "michael______"
ciphertext = c6.encrypt(plaintext)

ciphertext

'krxcum4re8Ma8'

In [18]:
decrypted = c6.decrypt(ciphertext)
decrypted

'michael______'

In [19]:
def encrypt_username(username):
    c6 = FF3Cipher.withCustomAlphabet(key, tweak, "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_")
    return c6.encrypt(username)

In [20]:
encrypt_username_udf = udf(lambda z: encrypt_username(z), StringType())

In [21]:
cleaned_df.select(encrypt_username_udf("user_id").alias("user_id")).show(truncate=False)



+----------------+
|user_id         |
+----------------+
|i4k66NK761Ent   |
|94aZuPNRjLAohOo6|
|Ey6xE           |
|zZK6B53iU89zb3A |
|kQsNYOdc        |
|QuhLaBk4M3r     |
|BlcYRS8xFOrso   |
|NKpe9OBs5       |
|Z0Zds7F9_3Sa    |
|x3RCCzFtFr      |
|bwW4XlcdQtkzUh  |
|D032TqE6cPk     |
|AuwSFCmD        |
|HYfSLx9Y2X96G   |
|KnDPX4HL_3IHl   |
|Ms2zBy3K0sUQt   |
|q5K3p7          |
|4OW8rwJX_Uzd3j  |
|Uw9GmAAN        |
|3mzAhB7u        |
+----------------+
only showing top 20 rows



[Stage 9:>                                                          (0 + 1) / 1]                                                                                

It looks like it's working, but with UDFs you never know. Remember, Spark function evaluation is LAZY, so it will sample a bit and test. To see if it will work on the entire dataframe, we need to call collect. Let's test it out!

In [22]:
cleaned_df.select(encrypt_username_udf("user_id")).collect()

22/05/24 11:13:50 ERROR Executor: Exception in task 2.0 in stage 10.0 (TID 14)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in 

Py4JJavaError: An error occurred while calling o115.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 1 times, most recent failure: Lost task 2.0 in stage 10.0 (TID 14, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_68/2814461702.py", line 1, in <lambda>
  File "/tmp/ipykernel_68/1864050288.py", line 3, in encrypt_username
  File "/usr/local/lib/python3.7/site-packages/ff3/ff3.py", line 123, in encrypt
    return self.encrypt_with_tweak(plaintext, self.tweak)
  File "/usr/local/lib/python3.7/site-packages/ff3/ff3.py", line 173, in encrypt_with_tweak
    raise ValueError(f"message length {n} is not within min {self.minLen} and max {self.maxLen} bounds")
ValueError: message length 3 is not within min 4 and max 32 bounds

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_68/2814461702.py", line 1, in <lambda>
  File "/tmp/ipykernel_68/1864050288.py", line 3, in encrypt_username
  File "/usr/local/lib/python3.7/site-packages/ff3/ff3.py", line 123, in encrypt
    return self.encrypt_with_tweak(plaintext, self.tweak)
  File "/usr/local/lib/python3.7/site-packages/ff3/ff3.py", line 173, in encrypt_with_tweak
    raise ValueError(f"message length {n} is not within min {self.minLen} and max {self.maxLen} bounds")
ValueError: message length 3 is not within min 4 and max 32 bounds

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


Oh no! What happened here???

In [23]:
def add_padding_and_encrypt(username):
    c6 = FF3Cipher.withCustomAlphabet(key, tweak, "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_")
    if len(username) < 4:
        username += "X" * (4-len(username))
    return c6.encrypt(username)

pad_and_encrypt_username_udf = udf(lambda y: add_padding_and_encrypt(y), StringType())

cleaned_df.select(pad_and_encrypt_username_udf("user_id")).collect()

                                                                                

[Row(<lambda>(user_id)='i4k66NK761Ent'),
 Row(<lambda>(user_id)='94aZuPNRjLAohOo6'),
 Row(<lambda>(user_id)='Ey6xE'),
 Row(<lambda>(user_id)='zZK6B53iU89zb3A'),
 Row(<lambda>(user_id)='kQsNYOdc'),
 Row(<lambda>(user_id)='QuhLaBk4M3r'),
 Row(<lambda>(user_id)='BlcYRS8xFOrso'),
 Row(<lambda>(user_id)='NKpe9OBs5'),
 Row(<lambda>(user_id)='Z0Zds7F9_3Sa'),
 Row(<lambda>(user_id)='x3RCCzFtFr'),
 Row(<lambda>(user_id)='bwW4XlcdQtkzUh'),
 Row(<lambda>(user_id)='D032TqE6cPk'),
 Row(<lambda>(user_id)='AuwSFCmD'),
 Row(<lambda>(user_id)='HYfSLx9Y2X96G'),
 Row(<lambda>(user_id)='KnDPX4HL_3IHl'),
 Row(<lambda>(user_id)='Ms2zBy3K0sUQt'),
 Row(<lambda>(user_id)='q5K3p7'),
 Row(<lambda>(user_id)='4OW8rwJX_Uzd3j'),
 Row(<lambda>(user_id)='Uw9GmAAN'),
 Row(<lambda>(user_id)='3mzAhB7u'),
 Row(<lambda>(user_id)='gLyoXOXvNF'),
 Row(<lambda>(user_id)='uqe8_dw66T3o'),
 Row(<lambda>(user_id)='HPlGTY'),
 Row(<lambda>(user_id)='hsn3gZsel'),
 Row(<lambda>(user_id)='LQBW3ZeLLss9'),
 Row(<lambda>(user_id)='pc96jgM

This looks like it works now! Let's add it as a column.

In [24]:
cleaned_df = cleaned_df.withColumn('user_id', pad_and_encrypt_username_udf("user_id"))

In [26]:
cleaned_df.head()

Row(air_quality_index=200, user_id='i4k66NK761Ent', top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188', lat=10.666680335998535, long=-61.518890380859375, city='Port of Spain', country='TT', timezone='America/Port_of_Spain', air_quality_category=4)

We are now technically leaking length information... which we could determine is okay, so long as access to this data and the real data is fairly controlled. We could also say that we want to by default add padding to every username to make them consistent. This would be a good homework exercise (and also to write a function to decrypt and remove padding!!). One challenge, what happens if my username ends in X??? :) 


Now we can move onto our GPS data!

How precise is GPS data anyways? ðŸ¤” (from [wikipedia](https://en.wikipedia.org/wiki/Decimal_degrees))


decimal places  | degrees  |distance
------- | -------          |--------
0        |1                |111  km
1        |0.1              |11.1 km
2        |0.01             |1.11 km
3        |0.001            |111  m
4        |0.0001           |11.1 m
5        |0.00001          |1.11 m
6        |0.000001         |11.1 cm
7        |0.0000001        |1.11 cm
8        |0.00000001       |1.11 mm

In [27]:
cleaned_df.show(2, vertical=True)

-RECORD 0-------------------------------------------
 air_quality_index           | 200                  
 user_id                     | i4k66NK761Ent        
 top_pollutant               | PM10                 
 top_pollutant_concentration | 88                   
 timestamp                   | 2022-04-07T14:38:... 
 lat                         | 10.66668             
 long                        | -61.51889            
 city                        | Port of Spain        
 country                     | TT                   
 timezone                    | America/Port_of_S... 
 air_quality_category        | 4                    
-RECORD 1-------------------------------------------
 air_quality_index           | 200                  
 user_id                     | 94aZuPNRjLAohOo6     
 top_pollutant               | PM10                 
 top_pollutant_concentration | 45                   
 timestamp                   | 2022-04-07T14:39:... 
 lat                         | 34.25628       

In [28]:
cleaned_df = cleaned_df.withColumn('lat', pys_round('lat', 3))
cleaned_df = cleaned_df.withColumn('long', pys_round('long', 3))

In [29]:
cleaned_df.show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------
 air_quality_index           | 200                        
 user_id                     | i4k66NK761Ent              
 top_pollutant               | PM10                       
 top_pollutant_concentration | 88                         
 timestamp                   | 2022-04-07T14:38:34.869188 
 lat                         | 10.667                     
 long                        | -61.519                    
 city                        | Port of Spain              
 country                     | TT                         
 timezone                    | America/Port_of_Spain      
 air_quality_category        | 4                          
-RECORD 1-------------------------------------------------
 air_quality_index           | 200                        
 user_id                     | 94aZuPNRjLAohOo6           
 top_pollutant               | PM10                       
 top_pollutant_concentration | 45                       

[Stage 15:>                                                         (0 + 1) / 1]                                                                                

What type of risk should we be aware of with regard to timestamp precision? When and how do we need to de-risk this  type of information?

In [30]:
cleaned_df = cleaned_df.withColumn('timestamp', to_timestamp('timestamp'))

In [31]:
cleaned_df.withColumn('new_timestamp', (unix_timestamp('timestamp') + 
                                    (rand() * 60) + (rand() * 60 * 20)).cast('timestamp')).select('timestamp', 'new_timestamp').show(truncate=False)

+--------------------------+--------------------------+
|timestamp                 |new_timestamp             |
+--------------------------+--------------------------+
|2022-04-07 14:38:34.869188|2022-04-07 14:48:33.551165|
|2022-04-07 14:39:16.869188|2022-04-07 14:51:29.564218|
|2022-04-07 14:39:51.869188|2022-04-07 14:46:45.187155|
|2022-04-07 14:40:34.869188|2022-04-07 14:42:08.050917|
|2022-04-07 14:41:12.869188|2022-04-07 14:59:11.461179|
|2022-04-07 14:41:55.869188|2022-04-07 14:58:12.785815|
|2022-04-07 14:42:33.869188|2022-04-07 14:56:16.421431|
|2022-04-07 14:43:08.869188|2022-04-07 14:59:48.161188|
|2022-04-07 14:43:54.869188|2022-04-07 14:56:24.036884|
|2022-04-07 14:44:36.869188|2022-04-07 14:50:59.744584|
|2022-04-07 14:45:24.869188|2022-04-07 14:58:46.534095|
|2022-04-07 14:46:14.869188|2022-04-07 14:55:49.995396|
|2022-04-07 14:46:47.869188|2022-04-07 14:54:36.319058|
|2022-04-07 14:47:27.869188|2022-04-07 14:56:41.944552|
|2022-04-07 14:47:53.869188|2022-04-07 15:05:06.

In [32]:
cleaned_df = cleaned_df.withColumn('timestamp', (unix_timestamp('timestamp') + 
                        (rand() * 60) + (rand() * 60 * 20)).cast('timestamp'))

In [33]:
cleaned_df.head()

[Stage 17:>                                                         (0 + 1) / 1]                                                                                

Row(air_quality_index=200, user_id='i4k66NK761Ent', top_pollutant='PM10', top_pollutant_concentration='88', timestamp=datetime.datetime(2022, 4, 7, 14, 43, 18, 902818), lat=10.666999816894531, long=-61.51900100708008, city='Port of Spain', country='TT', timezone='America/Port_of_Spain', air_quality_category=4)

In [34]:
cleaned_df = cleaned_df.orderBy(cleaned_df.timestamp.asc())

In [35]:
!rm -rf data/data_for_marketing

In [36]:
cleaned_df.write.format('csv').save('data/data_for_marketing')

                                                                                

### Congratulations!! 

You've walked through potential privacy snags and helped increase the protection for the individuals sending you their air quality details! Now developers can use this dataset and we have ensured that there are some base protections. As you may have noticed, it wasn't always obvious what we should do -- but by thinking through each data type and determining what worked to balance the utility of the data and the privacy we want to offer, we were able to find some ways to protect individuals. 

A good set of questions to ask for guidance is:

- Where will this data be accessed and used? How safe is this environment?
- What person-related data do we actually need to use to deliver this service or product? (data minimization!)
- What other protections will be added to this data before it is seen or used? (i.e. encryption at rest, access control systems, or other protections when it reaches another processing point or sink!)
- What privacy and security expectations do we want to set for the individuals in this dataset?
- Where can we opportunistically add more protection while not hindering the work of data scientists, data analysts, software engineers and other colleagues?


As you continue on in your data engineering journey, you'll likely encounter many more situations where you'll need to make privacy and security decisions. If you'd like to learn more and even work as a privacy or security champion -- feel free to join in your organizations' programs to support topics like this!