<a href="https://colab.research.google.com/github/balasaireddy/fetch-assignment/blob/master/Percidio_Test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
!pip install tabulate # Install the tabulate library
import pandas as pd
from tabulate import tabulate # Import the tabulate function

# Replace 'your_file.csv' with the actual filename
df = pd.read_csv('/content/synthetic_pii_pci_data.csv')
print(tabulate(df.head(10), headers='keys', tablefmt='pretty'))

+---+-----------------+------------------------------+---------------------+------------------------------+-------------+---------------------+--------------------+-----------------+----------------------------+-----------------------------------------+---------------+-----------------+--------------------+
|   |      name       |           address            |    phone_number     |            email             |     ssn     | credit_card_number  | credit_card_expiry | credit_card_cvv |          company           |                job_title                | date_of_birth |   ip_address    |    bank_account    |
+---+-----------------+------------------------------+---------------------+------------------------------+-------------+---------------------+--------------------+-----------------+----------------------------+-----------------------------------------+---------------+-----------------+--------------------+
| 0 |  Allison Hill   |      819 Johnson Course      |    886.737.9402   

In [2]:
!pip install pyspark
!pip install presidio-analyzer presidio-anonymizer
!pip install spacy
!python -m spacy download en_core_web_lg


Collecting presidio-analyzer
  Downloading presidio_analyzer-2.2.355-py3-none-any.whl.metadata (2.9 kB)
Collecting presidio-anonymizer
  Downloading presidio_anonymizer-2.2.355-py3-none-any.whl.metadata (8.2 kB)
Collecting phonenumbers<9.0.0,>=8.12 (from presidio-analyzer)
  Downloading phonenumbers-8.13.49-py2.py3-none-any.whl.metadata (10 kB)
Collecting tldextract (from presidio-analyzer)
  Downloading tldextract-5.1.3-py3-none-any.whl.metadata (11 kB)
Collecting azure-core (from presidio-anonymizer)
  Downloading azure_core-1.32.0-py3-none-any.whl.metadata (39 kB)
Collecting pycryptodome>=3.10.1 (from presidio-anonymizer)
  Downloading pycryptodome-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting requests-file>=1.4 (from tldextract->presidio-analyzer)
  Downloading requests_file-2.1.0-py2.py3-none-any.whl.metadata (1.7 kB)
Downloading presidio_analyzer-2.2.355-py3-none-any.whl (109 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, current_timestamp
from pyspark.sql.types import StringType
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import pandas as pd

In [5]:
# Initialize Spark session for Google Colab
spark = SparkSession.builder \
    .appName("PII Anonymization Pipeline") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", 10000) \
    .getOrCreate()


In [6]:
# Initialize Presidio Engines
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()


# Define custom patterns for policy numbers and timestamps
policy_number_pattern = Pattern(name="policy_number_pattern", regex=r"\d{8}", score=1.0)
property_policy_number_pattern = Pattern(name="property_policy_number_pattern", regex=r"[A-Z]{3}\d{6}", score=1.0)
time_pattern = Pattern(name="effective_timestamp_pattern", regex=r"(0[1-9]|1[0-2])/[0-3][0-9]/(?:19|20)\d{2}", score=1.0)

policy_number_recognizer = PatternRecognizer(supported_entity="POLICY_NUMBER", patterns=[policy_number_pattern, property_policy_number_pattern])
time_recognizer = PatternRecognizer(supported_entity="EFFECTIVE_DATE", patterns=[time_pattern])

# Register custom recognizers
analyzer.registry.add_recognizer(policy_number_recognizer)
analyzer.registry.add_recognizer(time_recognizer)

# Define custom anonymization operators
custom_anonymization_operators = {
    "PERSON": OperatorConfig("custom", {"lambda": lambda x: "**NAME**"}),
    "LOCATION": OperatorConfig("custom", {"lambda": lambda x: "**LOCATION**"}),
    "EMAIL_ADDRESS": OperatorConfig("custom", {"lambda": lambda x: "**EMAIL**"}),
    "PHONE_NUMBER": OperatorConfig("custom", {"lambda": lambda x: "**PHONE_NUMBER**"}),
    "POLICY_NUMBER": OperatorConfig("custom", {"lambda": lambda x: "**POLICY_NUMBER**"}),
    "EFFECTIVE_DATE": OperatorConfig("custom", {"lambda": lambda x: "**EFFECTIVE_DATE**"}),
    "DATE_TIME": OperatorConfig("custom", {"lambda": lambda x: "**DATE_TIME**"})
}



In [10]:
# Broadcast Presidio engines for performance
broadcast_analyzer = spark.sparkContext.broadcast(analyzer)
broadcast_anonymizer = spark.sparkContext.broadcast(anonymizer)

# Define a list of common audit fields to exclude
audit_field_patterns = ["timestamp", "created_by", "updated_at", "modified_by"]

@pandas_udf(StringType())
def analyze_and_anonymize(text_series: pd.Series) -> pd.Series:
    analyzer = broadcast_analyzer.value
    anonymizer = broadcast_anonymizer.value
    anonymized_texts = []

    for text in text_series:
        # If the cell is empty, skip processing
        if not text or text.strip() == "":
            anonymized_texts.append(text)
            continue

        # Step 1: Analyze the text to detect PII entities
        results = analyzer.analyze(text=text, language="en")

        # Step 2: Anonymize detected PII entities using custom operators
        if results:
            anonymized_result = anonymizer.anonymize(
                text=text,
                analyzer_results=results,
                operators=custom_anonymization_operators
            )
            anonymized_texts.append(anonymized_result.text)
        else:
            anonymized_texts.append(text)

    return pd.Series(anonymized_texts)

In [None]:
csv_path='/content/synthetic_pii_pci_data.csv'
df = spark.read.csv(csv_path, header=True, inferSchema=True)

excluded_columns = [col for col in df.columns if any(pattern in col.lower() for pattern in audit_field_patterns)]

# Step 3: Convert numeric columns to strings before anonymization
for column in df.columns:
    if column not in excluded_columns and (df.schema[column].dataType.simpleString() in ["int", "bigint", "float", "double"]):
        df = df.withColumn(column, col(column).cast("string"))

# Step 4: Filter out audit columns and identify PII columns
pii_columns = [col for col in df.columns if col not in excluded_columns]

# Step 5: Apply anonymization to PII columns
anonymized_df = df
for column in pii_columns:
    anonymized_df = anonymized_df.withColumn(
        column,
        analyze_and_anonymize(col(column))
    )

# Step 6: Add ingestion metadata
anonymized_df = anonymized_df.withColumn("ingestion_timestamp", current_timestamp())

# Step 7: Show top 20 rows for verification
# anonymized_df.show(20, truncate=False)

# Optionally, save the anonymized data back to a new CSV file (uncomment if needed)
anonymized_df.write.csv('/content/drive/MyDrive/anonymized_output.csv', mode='overwrite', header=True)