## Data Security & Privacy in Workflows

In this notebook, we'll explore ways to incorporate data security and privacy into large-scale data workflows. First, you might be wondering why we need to worry about these topics. Shouldn't this be a problem solved by the privacy department, infosec or via product owners? 

You can think of large scale data workflows like folks who manage the internet. We don't often see their work, but we know when it's broken! They probably deserve a lot more credit and attention for it, but we somehow just expect it "to work ubiquitously." And we certainly expect the data we send around the internet to be kept private and secure (although in some geographies it is less likely...). If it wasn't for the work on those large-scale packet pipelines, then we couldn't trust technologies like SSL, TLS or our web applications or mobile applications. Those are enabled, propogated and enforced by all the intermediary hops, meaning the packet and data are handled with the same promises as they arrived. Hopefully you are getting the picture here -- security and privacy have to be baked into the architecture and data flow from the start, and cannot be simply "tacked on" at a given endpoint.

So now we understand our responsibilities as the folks building the data backbones. What privacy and security concerns do we actually have? We'll walk through a concrete example to have a look!

### Example Data Product: Ingest Air Quality Sensors while Protecting User Privacy

- We want to ingest air quality sensor data from users, buildings and institutions who are willing to send us data to build an air quality map (similar to the [IQAir map](https://www.iqair.com/air-quality-map).
- Users only want to share the data if they can remain anonymous and their location is fuzzy, so that they are protected against stalkers, prying eyes and state surveillance.
- Since the data is sensitive (from people and their homes!), we want to sure that it is secured either at collection, as well as at any intermediary hops.

Let's first take a look at our data and determine what can and should be done...

In [None]:
%run ./init

Python interpreter will be restarted.
Python interpreter will be restarted.


In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, TimestampType
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, lit, col

import pandas as pd
import os

cwd = os.getcwd()
root_dir = ""

if (cwd == "/databricks/driver"):
    root_dir = f"{working_directory}/"
else:
    root_dir = ""

df = spark.read.csv(f'{root_dir}/data/air_quality.csv', header=True, inferSchema=True)



Out[1]: True

Out[4]: [FileInfo(path='dbfs:/FileStore/samvardhan.gopathy/dataSecurity/data/air_quality.csv', name='air_quality.csv', size=11244658, modificationTime=1679599690000)]

I need to parse this properly in Spark... luckily there are a few helper functions that can help me with this problem! Let's explore them together :)

First, I wnat to get an example row and see the schema so I can be sure that the data types are what I expect...

In [None]:
df.select('location').show(1, truncate=False)

+-------------------------------------------------------------------------+
|location                                                                 |
+-------------------------------------------------------------------------+
|('10.66668', '-61.51889', 'Port of Spain', 'TT', 'America/Port_of_Spain')|
+-------------------------------------------------------------------------+
only showing top 1 row



In [None]:
df.printSchema()

root
 |-- air_quality_index: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- top_pollutant: string (nullable = true)
 |-- top_pollutant_concentration: string (nullable = true)
 |-- timestamp: string (nullable = true)



Now I want to remove some of the extra characters around the string so that it is easier to parse. I will use regular expressions to do that in a few easy-to-read commands.

In [None]:
def replace_brackets_in_location(df): 
# remove extra characters - "[()']" from the location column string    

[0;36m  File [0;32m"<command-2812539252775323>"[0;36m, line [0;32m2[0m
[0;31m    # remove extra characters - "[()']" from the location column string[0m
[0m                                                                           ^[0m
[0;31mIndentationError[0m[0;31m:[0m expected an indented block


In [None]:
def test_replace_brackets_in_location():
  pandas_df = pd.DataFrame(
      {
          'id':  pd.Series([1, 2, 3]),
          'location': pd.Series(["New York, NY", "(Los Angeles), CA", "Chicago' IL"]),
      }
  )
  expected_pandas_df = pd.DataFrame(
      {
          'id':  pd.Series([1, 2, 3]),
          'location': pd.Series(["New York, NY", "Los Angeles, CA", "Chicago IL"]),
      }
  )
  spark_df = spark.createDataFrame(pandas_df)
  expected_df = spark.createDataFrame(expected_pandas_df)

  fixed_df = replace_brackets_in_location(spark_df)
  
  assert fixed_df.schema == expected_df.schema
  assert fixed_df.collect() == expected_df.collect()

  print("All tests passed :)")
  
test_replace_brackets_in_location()

All tests passed :)


##### Solution

In [None]:
def replace_brackets_in_location(df): 
    return df.withColumn("location", F.regexp_replace("location", "[()']", ""))

In [None]:
df = replace_brackets_in_location(df)

In [None]:
df.select('location').show(1, truncate=False)

+-------------------------------------------------------------+
|location                                                     |
+-------------------------------------------------------------+
|10.66668, -61.51889, Port of Spain, TT, America/Port_of_Spain|
+-------------------------------------------------------------+
only showing top 1 row



Now it's easier for me to split the string and properly parse it. Spark has a split function that can help. I will put this into a new column called location_arr for location array.

In [None]:
def split_location(df):
# split the "location" column into an array of strings    

In [None]:
from  pyspark.sql.types import IntegerType, StructField, StringType, ArrayType, FloatType, StructType

def test_split_location():
  schema = StructType([
      StructField("id", IntegerType()),
      StructField("location", StringType())
  ])
  pandas_df = pd.DataFrame(
      {
          'id':  pd.Series([1, 2, 3]),
          'location': pd.Series(["New York, NY", "Los Angeles, CA", "Chicago, IL"]),
      }
  )
  expected_schema = StructType([
      StructField("id", IntegerType()),
      StructField("location", StringType()),
      StructField("location_arr", ArrayType(StringType(), False))
  ])
  expected_pandas_df = pd.DataFrame(
      {
          'id':  pd.Series([1, 2, 3]),
          'location': pd.Series(["New York, NY", "Los Angeles, CA", "Chicago, IL"]),
          'location_arr': pd.Series([ ["New York","NY"], ["Los Angeles","CA"], ["Chicago","IL"]]),
      }
  )
  spark_df = spark.createDataFrame(pandas_df,schema)
  expected_df = spark.createDataFrame(expected_pandas_df,expected_schema)

  fixed_df = split_location(spark_df)
 
  assert fixed_df.schema == expected_df.schema
  assert fixed_df.collect() == expected_df.collect()

  print("All tests passed :)")
  
test_split_location()

All tests passed :)


##### Solution

In [None]:
def split_location(df):
    return df.withColumn("location_arr", F.split("location", ", "))

In [None]:
df = split_location(df)

In [None]:
df.head()

Out[16]: Row(air_quality_index=200, user_id='maynardmorgan', location='10.66668, -61.51889, Port of Spain, TT, America/Port_of_Spain', top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188', location_arr=['10.66668', '-61.51889', 'Port of Spain', 'TT', 'America/Port_of_Spain'])

Looking much better!!! Now we need to fix the schema problems... and take these from the location array and put them into actual columns!

In [None]:
def create_cols_for_location_arr_items(df):
# The 'location' column should be split into separate columns lat,long,city,country and timezone

In [None]:
def test_create_cols_for_location_arr_items():
  schema = StructType([
      StructField("id", IntegerType()),
      StructField("location", StringType()),
       StructField("location_arr", ArrayType(StringType(), False))
  ])
  pandas_df = pd.DataFrame(
      {
          'id':  pd.Series([1, 2]),
          'location': pd.Series(["New York, NY", "Los Angeles, CA"]),
          'location_arr' : pd.Series([ ["40.7128","-74.0060","New York","USA","America/New_York"], ["34.0522","-118.2437","Los Angeles","USA","America/Los_Angeles"]])
      }
  )
  expected_schema = StructType([
      StructField("id", IntegerType()),
      StructField("location", StringType()),
      StructField("location_arr", ArrayType(StringType(), False)),
      StructField("lat", FloatType()),
      StructField("long", FloatType()),
      StructField("city", StringType()),
      StructField("country", StringType()),
      StructField("timezone", StringType())
  ])
  expected_pandas_df = pd.DataFrame(
      {
          'id':  pd.Series([1, 2]),
          'location': pd.Series(["New York, NY", "Los Angeles, CA"]),
          'location_arr' : pd.Series([ ["40.7128","-74.0060","New York","USA","America/New_York"], ["34.0522","-118.2437","Los Angeles","USA","America/Los_Angeles"]]),
          'lat' : pd.Series([40.7128, 34.0522]),
          'long' : pd.Series([-74.0060, -118.2437]),
          'city' : pd.Series(["New York", "Los Angeles"]),
          'country' : pd.Series(["USA", "USA"]),
          'timezone' : pd.Series(["America/New_York", "America/Los_Angeles"]),
      }
  )
  spark_df = spark.createDataFrame(pandas_df,schema)
  expected_df = spark.createDataFrame(expected_pandas_df,expected_schema)

  fixed_df = create_cols_for_location_arr_items(spark_df)
 

  assert fixed_df.schema == expected_df.schema
  assert fixed_df.collect() == expected_df.collect()

  print("All tests passed :)")
  
test_create_cols_for_location_arr_items()

All tests passed :)


##### Solution

In [None]:
def create_cols_for_location_arr_items(df):
    return df.withColumn('lat', df.location_arr.getItem(0).cast('float')) \
    .withColumn('long', df.location_arr.getItem(1).cast('float')) \
    .withColumn('city', df.location_arr.getItem(2)) \
    .withColumn('country', df.location_arr.getItem(3)) \
    .withColumn('timezone', df.location_arr.getItem(4))

In [None]:
df = create_cols_for_location_arr_items(df)
df.printSchema()

root
 |-- air_quality_index: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- top_pollutant: string (nullable = true)
 |-- top_pollutant_concentration: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- location_arr: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- lat: float (nullable = true)
 |-- long: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- timezone: string (nullable = true)



The schema above looks much better, now I will drop the old columns since they are useless to me now...

In [None]:
df = df.drop('location', 'location_arr')

Next I need to create my air quality categories as follows...

Based on the IQAir map, the ranges look to be about:

    Great: less than or equal to 50
    Good: 51-100
    Okay: 101-150
    Poor: 151-200
    Bad: 201-300
    Extremely Bad: 301+

Let's make these into integer values 1-6

In [None]:
df = df.withColumn('air_quality_category', F.when(
    df.air_quality_index <= 50, 1).when(
    df.air_quality_index <= 100, 2).when(
    df.air_quality_index <= 150, 3).when(
    df.air_quality_index <= 200, 4).when(
    df.air_quality_index <= 300, 5).otherwise(6))

In [None]:
df.head()

Out[22]: Row(air_quality_index=200, user_id='maynardmorgan', top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188', lat=10.666680335998535, long=-61.518890380859375, city='Port of Spain', country='TT', timezone='America/Port_of_Spain', air_quality_category=4)

### So what even is sensitive information?

Categories of sensitive information:

- **Personally Identifiable Information (PII)**: This is information that we can directly link to a person without much effort. This includes information like email address, IP address, legal name, address, birth date, gender and so forth. Even just one of these pieces of information can be enough to directly identify someone in a dataset.
- **Person-Related Information**: This is data that is created by a person and that likely has some personal artifacts. For example, [web browsing histories are fairly unique](https://blog.lukaszolejnik.com/web-browsing-histories-are-private-personal-data-now-what/), so is location data (i.e. Where do you sleep at night? Where do you work?) and even your likes on social media can statistically reveal sensitive attributes, such as your gender, ethnicity and your political preferences.
- **Confidential Information**: This is sensitive information for companies, that should remain protected via similar methods as personal data. This data could reveal details about the core business model, proprietary practices, customer details (which can also contain personal information!) and internal business processes.

When we define sensitive information as only PII, we tend to ignore other potential targets of sensitive data, that might be just as, if not more valuable!

#### What is sensitive here?

In [None]:
df.sample(0.01).show(3, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------
 air_quality_index           | 0                          
 user_id                     | pfrost                     
 top_pollutant               | CO                         
 top_pollutant_concentration | 71                         
 timestamp                   | 2022-04-07T14:52:45.869188 
 lat                         | 23.15678                   
 long                        | -81.24441                  
 city                        | Varadero                   
 country                     | CU                         
 timezone                    | America/Havana             
 air_quality_category        | 1                          
-RECORD 1-------------------------------------------------
 air_quality_index           | 140                        
 user_id                     | seanpreston                
 top_pollutant               | PM2.5                      
 top_pollutant_concentration | 74                       

In [None]:
df.show()

+-----------------+----------------+-------------+---------------------------+--------------------+---------+---------+-------------------+-------+--------------------+--------------------+
|air_quality_index|         user_id|top_pollutant|top_pollutant_concentration|           timestamp|      lat|     long|               city|country|            timezone|air_quality_category|
+-----------------+----------------+-------------+---------------------------+--------------------+---------+---------+-------------------+-------+--------------------+--------------------+
|              200|   maynardmorgan|         PM10|                         88|2022-04-07T14:38:...| 10.66668|-61.51889|      Port of Spain|     TT|America/Port_of_S...|                   4|
|              200|millerjacqueline|         PM10|                         45|2022-04-07T14:39:...| 34.25628|-78.04471|             Leland|     US|    America/New_York|                   4|
|                0|           ikirk|          NO2|

#### How might we...?

- Protect user_id while still allowing it to be linkable?
- Remove potentially identifying precision in location?
- Remove potentially identifying information in the timestamp?
- Make these into scalable and repeatable actions for our workflow?

Let's work on these step by step!

# Cryptographic Hashing


- Protect user_id by hashing it's value 

Hashing is a one-way function that takes an input (e.g. a password or other data) and generates a fixed-size output (the hash value) that is unique to that input.The purpose of hashing is to provide data integrity and to protect against tampering or unauthorized modifications of the data.

In [None]:
def hash_column(df, column_name):
    # TODO

[0;36m  File [0;32m"<command-1661023049023575>"[0;36m, line [0;32m1[0m
[0;31m    def hash_column(df, column_name):[0m
[0m                                     ^[0m
[0;31mIndentationError[0m[0;31m:[0m expected an indented block


In [None]:
 def test_hash_column():
     schema = StructType([StructField("user_id", StringType(), True)])
     data = [("12345",), ("67890",), ("abcdef",)]
     df = spark.createDataFrame(data, schema)
        
     hashed_df = hash_column(df, "user_id")
        
     original_values = ["12345", "67890", "abcdef"]
     hashed_values = hashed_df.select("hashed_user_id").rdd.flatMap(lambda x: x).collect()
     for i in range(len(original_values)):
         assert hashed_values[i] != original_values[i], "The number of hashed values has been matched with the original value"
        
test_hash_column()            

In [None]:
df = hash_column(df, 'user_id')
df.show()

+-----------------+----------------+-------------+---------------------------+--------------------+---------+---------+-------------------+-------+--------------------+--------------------+--------------------+
|air_quality_index|         user_id|top_pollutant|top_pollutant_concentration|           timestamp|      lat|     long|               city|country|            timezone|air_quality_category|      hashed_user_id|
+-----------------+----------------+-------------+---------------------------+--------------------+---------+---------+-------------------+-------+--------------------+--------------------+--------------------+
|              200|   maynardmorgan|         PM10|                         88|2022-04-07T14:38:...| 10.66668|-61.51889|      Port of Spain|     TT|America/Port_of_S...|                   4|ef0e8649d9000ff22...|
|              200|millerjacqueline|         PM10|                         45|2022-04-07T14:39:...| 34.25628|-78.04471|             Leland|     US|    Ameri

# Solution

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import hashlib

def hash_column(df, column_name):
    def sha256_hash(value):
        return hashlib.sha256(str(value).encode('utf-8')).hexdigest()
    udf_sha256_hash = udf(sha256_hash, StringType())
    df = df.withColumn('hashed_' + column_name, udf_sha256_hash(column_name))
    return df

In [None]:
new_df= spark.createDataFrame(df.rdd, df.schema)
new_df.show()

+-----------------+----------------+-------------+---------------------------+--------------------+---------+---------+-------------------+-------+--------------------+--------------------+--------------------+
|air_quality_index|         user_id|top_pollutant|top_pollutant_concentration|           timestamp|      lat|     long|               city|country|            timezone|air_quality_category|      hashed_user_id|
+-----------------+----------------+-------------+---------------------------+--------------------+---------+---------+-------------------+-------+--------------------+--------------------+--------------------+
|              200|   maynardmorgan|         PM10|                         88|2022-04-07T14:38:...| 10.66668|-61.51889|      Port of Spain|     TT|America/Port_of_S...|                   4|ef0e8649d9000ff22...|
|              200|millerjacqueline|         PM10|                         45|2022-04-07T14:39:...| 34.25628|-78.04471|             Leland|     US|    Ameri

In [None]:
# Example code to show how Fernet works and encrypts a text string. 
 
from cryptography.fernet import Fernet
# >>> Put this somewhere safe!
key = Fernet.generate_key()
f = Fernet(key)
token = f.encrypt(b"secret message")
print(token)

b'gAAAAABkHKYmHdKGVuS1BdOIc2CPMOSSSyPQVprbCTP5wwv9qh86R8N-xrK7sYvql9aYOSjZnQBjj2rLqK1fLxmGU2DJMD-lzQ=='


In [None]:
print(f.decrypt(token).decode())

secret message


In [None]:
# generate the encryption key
Key = Fernet.generate_key()

In [None]:
# Define Encrypt User Defined Function 
def encrypt_user_id(plain_text,MASTER_KEY):
    f = Fernet(MASTER_KEY)
    plain_text_bytes=bytes(plain_text, 'utf-8')
    cipher_text = f.encrypt(plain_text_bytes)
    cipher_text = str(cipher_text.decode('ascii'))
    return cipher_text

In [None]:
import pandas as pd

def test_encrypt_user_id():
    
    plain_df = spark.createDataFrame(
        data=[
            (1,"abcd"),
            (2,"efgh"),
            (3,"ijkl")
        ],
        schema= ["id","user_id"],
    )
       
    key = Fernet.generate_key()
    f = Fernet(key)
    
    encrypted_udf = udf(encrypt_user_id, StringType())    
    encrypted_df = plain_df.withColumn("user_id", encrypted_udf(col('user_id'), lit(key))) 
    
    expected_decrypted_list = [f.decrypt(i['user_id'].encode()).decode() for i in encrypted_df.select(col('user_id')).collect() ]
    
    assert plain_df.select(col('user_id')).collect() != encrypted_df.select(col('user_id')).collect(),"The user_id column has to be encrypted after the encryption logic applied"    
    assert plain_df.rdd.map(lambda x: x.user_id).collect() == expected_decrypted_list,"The decrypted values has to match the plain_df's user_id values"
    
    print("All tests passed")
    
test_encrypt_user_id()    

All tests passed


In [None]:
encrypted_udf = udf(encrypt_user_id, StringType())

In [None]:
df.select(encrypted_udf(col("user_id"),lit(Key)).alias("user_id")).show()

+--------------------+
|             user_id|
+--------------------+
|gAAAAABkHKYzGCGFc...|
|gAAAAABkHKYzOyGO2...|
|gAAAAABkHKYzx4P2E...|
|gAAAAABkHKYzMbfpH...|
|gAAAAABkHKYzyaGO1...|
|gAAAAABkHKYzBg08Y...|
|gAAAAABkHKYzyIhzf...|
|gAAAAABkHKYz5TtI7...|
|gAAAAABkHKYzhycc0...|
|gAAAAABkHKYzjxhiF...|
|gAAAAABkHKYzXOH5I...|
|gAAAAABkHKYzMN5ea...|
|gAAAAABkHKYzbyOA2...|
|gAAAAABkHKYzGNJre...|
|gAAAAABkHKYzP9IoF...|
|gAAAAABkHKYzL7r9X...|
|gAAAAABkHKYz9FTPS...|
|gAAAAABkHKYzASmSe...|
|gAAAAABkHKYzLZAyr...|
|gAAAAABkHKYz3Wdgd...|
+--------------------+
only showing top 20 rows



It looks like it's working, but with UDFs you never know. Remember, Spark function evaluation is LAZY, so it will sample a bit and test. To see if it will work on the entire dataframe, we need to call collect. Let's test it out!

In [None]:
results = df.select(encrypted_udf(col("user_id"),lit(Key)).alias("user_id")).collect()
results

Out[37]: [Row(user_id='gAAAAABkHKY3514ocUr3Wo3_ONSv5jmkw9uQS8o-SPHa3cUuglx8Jv4-9ZvzUp4-uvdrcENbmKiCG5A7MmeylWGPNzEdXpl8YA=='),
 Row(user_id='gAAAAABkHKY3-QPeKTOEJTN4A3uE-6ZnF4Jpyok2MMDAteDvCkpHjjGeWLfnF_SANi5vUqGqgK3XttgT8hCkKtYKHVcxfLS6r9CgtmsGJNyjFCM892_CYj0='),
 Row(user_id='gAAAAABkHKY3-ZQX_V88ylPvpFRxzJUbmM4YhrPK9Lb291jXUo51PhZxuk1lYl9iU4jeaPWL3vgHZqOrO5ZUTZu_fHxiw44kaQ=='),
 Row(user_id='gAAAAABkHKY3Rk1rT8IHB2XO36gGhayQbYwZ4k_Av_RqcfBMYdArCq-pMyH8ogCZZL-2u-re00fXkeuPk5y44lUXodJiGhLbUg=='),
 Row(user_id='gAAAAABkHKY3klXLcOdREucsz_zDGy1uQJ_FoRgtaHNDT-i6VBRMvOIJJWagKAvUGROzZn6sjEo1ObD81kE7DNqguTBsAXtqvw=='),
 Row(user_id='gAAAAABkHKY3i4wEqU4pX3dhqQsJ5inx_IspKsxW5wQammtySO7OWK7KFYhqv06Qp1kVhUP6YQxu4mV-GfjVtIQwptIYmYhzzQ=='),
 Row(user_id='gAAAAABkHKY3KhoYZIWJjhyL_fJB-XgL7__QJl4ktiLkR1Pc3YRhEt-kAoJFcu2-Lt4fKvrK034lYnsRaWGLD1wuSmnjV4Epxw=='),
 Row(user_id='gAAAAABkHKY3SEFyTDa-jZUtbDNvjoNoN6LtYjSIR4dCZFT3IWhNSUYJX_JVyB5tT44xcHxlEVotD-B0DEZbovvNKy3KnHTeCA=='),
 Row(user_id='gAAAAABkHKY3D

This looks like it works now! Let's add it as a column.

In [None]:
df = df.withColumn("user_id", encrypted_udf(col('user_id'), lit(Key))) 

In [None]:
df.head()

Out[39]: Row(air_quality_index=200, user_id='gAAAAABkHKZEc__D4pYpstQX1qWV4CS5VIi03mex-03TXdx4Nzwq-haJRwWsG465wOPXnNShPF3s6L9xnZKjM9qb9hHwsnzNCw==', top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188', lat=10.666680335998535, long=-61.518890380859375, city='Port of Spain', country='TT', timezone='America/Port_of_Spain', air_quality_category=4, hashed_user_id='ef0e8649d9000ff22cc5cb5b57a4fe8fe5d6782f74ef640d4d1da317c7aa4fba')

Also we could write a function to decrypt. This would be a good homework exercise

In [None]:
# Define decrypt user defined function 
def decrypt_user_id(cipher_text,MASTER_KEY):
    from cryptography.fernet import Fernet
    f = Fernet(MASTER_KEY)
    clear_val=f.decrypt(cipher_text.encode()).decode()
    return clear_val

In [None]:
decrypted_udf = udf(decrypt_user_id, StringType())
dec_df = df.withColumn("user_id", decrypted_udf(col('user_id'), lit(Key))) 
dec_df.head()

Out[41]: Row(air_quality_index=200, user_id='maynardmorgan', top_pollutant='PM10', top_pollutant_concentration='88', timestamp='2022-04-07T14:38:34.869188', lat=10.666680335998535, long=-61.518890380859375, city='Port of Spain', country='TT', timezone='America/Port_of_Spain', air_quality_category=4, hashed_user_id='ef0e8649d9000ff22cc5cb5b57a4fe8fe5d6782f74ef640d4d1da317c7aa4fba')

# Format-Preserving Encryption

In [None]:
key = "mysecretkey"
new_df = fpe_hash_column(new_df, "user_id", key)

In [None]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.testing.sqlutils import ReusedSQLTestCase
def test_fpe_encrpyt_column():
        schema = StructType([
            StructField("id", StringType()),
            StructField("name", StringType())
        ])
        data = [
            Row("1", "John"),
            Row("2", "Jane"),
            Row("3", "Bob"),
            Row("4", "Alice")
        ]
        df = spark.createDataFrame(data, schema)
        key = 'secret_key'
        df_encrypted = fpe_hash_column(df, 'name', key)
        expected_result = [
            ('1', ffx_encrypt(key, 'John')),
            ('2', ffx_encrypt(key, 'Jane')),
            ('3', ffx_encrypt(key, 'Bob')),
            ('4', ffx_encrypt(key, 'Alice'))
        ]
        expected_schema = StructType([
            StructField("id", StringType()),
            StructField("fpe_hashed_name", StringType())
        ])
        expected_df = spark.createDataFrame(expected_result, expected_schema)

        assert df_encrypted.select('id', 'fpe_hashed_name').collect() == expected_df.select('id', 'fpe_hashed_name').collect() , "The user_id column has to be encrypted after the encryption logic applied" 
        
test_fpe_encrpyt_column()

In [None]:
new_df.show()

+-----------------+----------------+-------------+---------------------------+--------------------+---------+---------+-------------------+-------+--------------------+--------------------+--------------------+------------------+
|air_quality_index|         user_id|top_pollutant|top_pollutant_concentration|           timestamp|      lat|     long|               city|country|            timezone|air_quality_category|      hashed_user_id|fpe_hashed_user_id|
+-----------------+----------------+-------------+---------------------------+--------------------+---------+---------+-------------------+-------+--------------------+--------------------+--------------------+------------------+
|              200|   maynardmorgan|         PM10|                         88|2022-04-07T14:38:...| 10.66668|-61.51889|      Port of Spain|     TT|America/Port_of_S...|                   4|ef0e8649d9000ff22...|     2JBEWtuqwxPdZ|
|              200|millerjacqueline|         PM10|                         45|20

# Solution

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import pyffx

def ffx_encrypt(key, value):
    if len(value) < 6:
       value += "X" * (6-len(value))
    ffx = pyffx.String(key.encode(), alphabet='abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ', length=len(value))
    return ffx.encrypt(value)

def fpe_hash_column(df, column_name, key):
    ffx_udf = udf(lambda x: ffx_encrypt(key, x), StringType())
    df = df.withColumn("fpe_hashed_" + column_name, ffx_udf(col(column_name)))
    return df

# Format-Preserving Decryption

In [None]:
def fpe_decrypt_column
# Todo

In [None]:
df_decrypted = fpe_decrypt_column(new_df ,"fpe_hashed_user_id", key)
df_decrypted.select("user_id", "fpe_decrypted_user_id").show()

+----------------+---------------------+
|         user_id|fpe_decrypted_user_id|
+----------------+---------------------+
|   maynardmorgan|        maynardmorgan|
|millerjacqueline|     millerjacqueline|
|           ikirk|               ikirkX|
| williamszachary|      williamszachary|
|        sfarrell|             sfarrell|
|     mariamartin|          mariamartin|
|   michaelfuller|        michaelfuller|
|       acummings|            acummings|
|    glorialandry|         glorialandry|
|      martinjohn|           martinjohn|
|  karenmaldonado|       karenmaldonado|
|     aaronholden|          aaronholden|
|        johnny58|             johnny58|
|   sanderssummer|        sanderssummer|
|   bradleymosley|        bradleymosley|
|   atkinsrussell|        atkinsrussell|
|          hlewis|               hlewis|
|  crystalcarlson|       crystalcarlson|
|        sharon04|             sharon04|
|        tjohnson|             tjohnson|
+----------------+---------------------+
only showing top

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
from pyspark.sql.types import StringType

def test_fpe_decrypt_column():
    schema = StructType([
        StructField("id", StringType()),
        StructField("name", StringType())
    ])
    data = [
        Row("1", "John"),
        Row("2", "Jane"),
        Row("3", "Bob"),
        Row("4", "Alice")
    ]
    df = spark.createDataFrame(data, schema)
    key = 'secret_key'
    df_encrypted = fpe_hash_column(df, 'name', key)
    df_decrypted = fpe_decrypt_column(df_encrypted, "fpe_hashed_name",key)
        
    actual_rows = sorted(df_decrypted.select("id", "fpe_decrypted_user_id").collect(), key=lambda row: row.id)
    expected_rows = sorted(df.collect(), key=lambda row: row.id)
    assert len(actual_rows) == len(expected_rows)

test_fpe_decrypt_column()

# Solution

In [None]:
def ffx_decrypt(key, value):
    ffx = pyffx.String(key.encode(), alphabet='abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ', length=len(value))
    return ffx.decrypt(value)
 
def fpe_decrypt_column(df, column_name, key):
    ffx_udf = udf(lambda x: ffx_decrypt(key, x), StringType())
    df = df.withColumn("fpe_decrypted_user_id", ffx_udf(col(column_name)))
    return df

Now we can move onto our GPS data!

How precise is GPS data anyways? 🤔 (from [wikipedia](https://en.wikipedia.org/wiki/Decimal_degrees))


decimal places  | degrees  |distance
------- | -------          |--------
0        |1                |111  km
1        |0.1              |11.1 km
2        |0.01             |1.11 km
3        |0.001            |111  m
4        |0.0001           |11.1 m
5        |0.00001          |1.11 m
6        |0.000001         |11.1 cm
7        |0.0000001        |1.11 cm
8        |0.00000001       |1.11 mm

In [None]:
df.show(2, vertical=True)

-RECORD 0-------------------------------------------
 air_quality_index           | 200                  
 user_id                     | gAAAAABkHKbK_IKPz... 
 top_pollutant               | PM10                 
 top_pollutant_concentration | 88                   
 timestamp                   | 2022-04-07T14:38:... 
 lat                         | 10.66668             
 long                        | -61.51889            
 city                        | Port of Spain        
 country                     | TT                   
 timezone                    | America/Port_of_S... 
 air_quality_category        | 4                    
 hashed_user_id              | ef0e8649d9000ff22... 
-RECORD 1-------------------------------------------
 air_quality_index           | 200                  
 user_id                     | gAAAAABkHKbKi4uW-... 
 top_pollutant               | PM10                 
 top_pollutant_concentration | 45                   
 timestamp                   | 2022-04-07T14:3

To reduce precision in Spark, take a look at F.pys_round !

In [None]:
#TODO reduce precision for the lat and long columns!
def remove_column_precision(df,column_name):
    #TODO
    pass

All tests passed :)


In [None]:
def test_remove_column_precision():
  pandas_df = pd.DataFrame(
      {
          'id':  pd.Series([1, 2, 3]),
          'lat': pd.Series([40.1234567, 42.9876543, 45.67890]),
      }
  )
  expected_pandas_df = pd.DataFrame(
      {
          'id':  pd.Series([1, 2, 3]),
          'lat': pd.Series([40.123, 42.988, 45.679]),
      }
  )
  spark_df = spark.createDataFrame(pandas_df)
  expected_df = spark.createDataFrame(expected_pandas_df)

  fixed_df = remove_column_precision(spark_df,'lat')

  assert fixed_df.schema == expected_df.schema
  assert fixed_df.collect() == expected_df.collect()

  print("All tests passed :)")
  
test_remove_column_precision()

All tests passed :)


##### Solution

In [None]:
#TODO reduce precision for the lat and long columns!
def remove_column_precision(df,column_name):
    return df.withColumn(column_name, F.round(column_name, 3))

Once you get the above working without error, you can fix the lat and long columns in your dataframe.

In [None]:
df = remove_column_precision(df,'lat')
df = remove_column_precision(df,'long')

What type of risk should we be aware of with regard to timestamp precision? When and how do we need to de-risk this  type of information?

In [None]:
df = df.withColumn('timestamp', F.to_timestamp('timestamp'))


Try adding a new column called new_timestamp where you use F.unix_timestamp and F.rand to build a small amount of noise...

In [None]:
new_df = cleaned_df
  .withColumn("new_timestamp", #TODO: finish this to add the column to your DF once you have experimented and like the result!)
  .select(col("timestamp"), col("new_timestamp"))

new_df.show(false)

[0;36m  File [0;32m"<command-4276697378240479>"[0;36m, line [0;32m2[0m
[0;31m    .withColumn("new_timestamp", #TODO: finish this to add the column to your DF once you have experimented and like the result!)[0m
[0m    ^[0m
[0;31mIndentationError[0m[0;31m:[0m unexpected indent


In [None]:
from pyspark.sql.functions import to_timestamp, unix_timestamp, rand

def test_add_random_time_intervals():
    # create a sample DataFrame
    df = spark.createDataFrame([
        ('2022-01-01 00:00:00', 1),
        ('2022-01-01 01:00:00', 2),
        ('2022-01-01 02:00:00', 3),
        ('2022-01-01 03:00:00', 4),
        ('2022-01-01 04:00:00', 5),
    ], ['timestamp', 'value'])
    cleaned_df = df.withColumn('timestamp', (unix_timestamp('timestamp') + 
                            (rand() * 60) + (rand() * 60 * 20)).cast('timestamp'))
    assert 'timestamp' in cleaned_df.columns
    assert 'value' in cleaned_df.columns
    assert cleaned_df.filter(cleaned_df.timestamp != df.timestamp).count() == df.count()
test_add_random_time_intervals()

##### Solution

In [None]:
df = df.withColumn("timestamp", (unix_timestamp(col("timestamp")) +
                  (rand() * 60) + (rand() * 60 * 20)).cast("timestamp"))

Now we want to reorder so the data comes into the map in the proper order (for timeseries analysis)

In [None]:
df = df.orderBy(df.timestamp.asc())


In [None]:
df.write.format('csv').save(f'{root_dir}/data/data_for_map.csv')

This is a graphic from Wikipedia showing the so-called "CIA" triad, showing some of the core concepts we want to ensure to guarantee data security. Let's review them together:

- **Confidentiality:** Data is kept confidential, meaning only those who should be able to access it can do so, and fine-tuned access is available and enforced.
- **Integrity:** Data is accurate and cannot easily be changed or tampered with by internal or external actors in a malicious way.
- **Availability:** Data fulfills any service-level objectives (SLOs) or service-level agreements (SLAs) and is made available in a secure and user-friendly manner. 

So translated into data engineering context, this means that:

- Our data workflows enforce access-control restrictions, data protections or minimizations related to confidentiality and ensure sinks and sources match the encryption requirements we expect for the data sensitivity.
- Our data workflows do not mangle data, maintain data quality principles outlined by governance processes and alert should malicious activity be detected.
- Our data wofkflows meet SLOs/SLAs outlined by the data consumers and dependant data products.

### What about Privacy? 🦹🏻

A foundational concept when it comes to designing privacy-respecting systems is the Privacy by Design principles outlined by [Anne Cavoukian in 2006](https://iapp.org/media/pdf/resource_center/pbd_implement_7found_principles.pdf).

Let's pull out a few of the principles that relate to our work as data engineers...

- **Proactive not Reactive; Preventative not Remedial:** Privacy is built into our architecture and data flows as we start building them. Think of this as the privacy version of TDD -- we write the privacy requirements first and design and build systems to fit them, not the other way around!
- **Privacy as the Default Setting:** We optimize systems so that privacy is on by default, and changes to that are user-driven! This means tracking things like consent, implementing processes for data minimization and ensuring lineage and governance data is available to data consumers or dependant data products.
- **Full Functionality – Positive-Sum, not Zero-Sum:** Data privacy is a benefit for the business, technologists and users, meaning we ensure that it is not a tradeoff in our product design. Users who choose privacy protections (or users who have them on automatically, by default, right?) receive full functionality.
- **End-to-End Security – Full Lifecycle Protection:** Data is secured properly and for it's entire lifecycle (from collection endpoint to deletion!). Here is our big intersection with the security requirements.


What does this mean for our data engineering work?

- Our data workflows have privacy protections outlined and architected in before any code is written. We test for these and ensure they are working properly, should anything change.
- Privacy is turned on by default, and any "unknown" data flows have privacy added to them when they enter into our systems or are discovered (e.g. in cases of unknown data storages or data from third parties).
- We work directly with data producers and consumers (and other stakeholders, such as legal or privacy professionals) to find sweet spots that offer the appropriate protection for users and utility for business needs. Approach this as a postive-sum game and remember that user-centric choices are always a good business investment.
- We design secure workflows that ensure that all user-related or person-related data is properly secured using standards from data security best practices (like our CIA triad!)


#### Privacy and Information Continuum

One useful way to begin shifting your understanding of privacy is to start thinking about it as a point on a spectrum instead of something that is "on" or "off". Here we can see that we can have a full range of points on a continuum, where privacy and information are somewhat at odds with one another. When we have full information, we have no privacy guarantees. When we have complete privacy, we cannot do our job as data people! Finding the right balance is the difficult and fun part of privacy in data science!

### Congratulations!! 

You've walked through potential privacy snags and helped increase the protection for the individuals sending you their air quality details! Now developers can use this dataset and we have ensured that there are some base protections. As you may have noticed, it wasn't always obvious what we should do -- but by thinking through each data type and determining what worked to balance the utility of the data and the privacy we want to offer, we were able to find some ways to protect individuals. 

A good set of questions to ask for guidance is:

- Where will this data be accessed and used? How safe is this environment?
- What person-related data do we actually need to use to deliver this service or product? (data minimization!)
- What other protections will be added to this data before it is seen or used? (i.e. encryption at rest, access control systems, or other protections when it reaches another processing point or sink!)
- What privacy and security expectations do we want to set for the individuals in this dataset?
- Where can we opportunistically add more protection while not hindering the work of data scientists, data analysts, software engineers and other colleagues?


As you continue on in your data engineering journey, you'll likely encounter many more situations where you'll need to make privacy and security decisions. If you'd like to learn more and even work as a privacy or security champion -- feel free to join in your organizations' programs to support topics like this!