<a href="https://colab.research.google.com/github/Harish2303/ISCP_2.0/blob/main/ISCP_2.0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
# Cell 1: Upload your CSV file (run this cell and upload your iscp_pii_dataset_-_Sheet1.csv)
from google.colab import files
uploaded = files.upload()

Saving iscp_pii_dataset_-_Sheet1.csv to iscp_pii_dataset_-_Sheet1.csv
Saving mermaid-diagram-2025-08-14-130116.png to mermaid-diagram-2025-08-14-130116.png


In [9]:
# Cell 2: Rename file for easier handling (run if your file name is different)
import os
os.rename('iscp_pii_dataset_-_Sheet1.csv', 'iscp_pii_dataset.csv')

In [28]:
# Cell 3: Define PII detection & redaction functions and main logic

import csv
import json
import re

# Regex patterns
PHONE_PATTERN = re.compile(r'\b\d{10}\b')
AADHAR_PATTERN = re.compile(r'\b\d{12}\b')
PASSPORT_PATTERN = re.compile(r'\b[A-Z][0-9]{7}\b', re.IGNORECASE)
UPI_PATTERN = re.compile(r'\b[\w\d._%-]+@[\w\d.-]+\b')

def mask_phone(phone):
    return phone[:2] + 'XXXXXX' + phone[-2:]

def mask_aadhar(aadhar):
    return aadhar[:4] + 'XXXXXXXX' + aadhar[-0:]

def mask_passport(passport):
    return passport[0] + 'XXXXXXX'

def mask_upi(upi):
    parts = upi.split('@')
    masked_user = parts[0][:2] + 'XXXX' + parts[0][-1] if len(parts[0]) > 3 else 'XXXX'
    return masked_user + '@' + parts[1]

def mask_name(name):
    parts = name.split()
    if len(parts) < 2:
        return parts[0][0] + 'X'*(len(parts[0])-1)
    first, last = parts[0], parts[-1]
    masked_first = first[0] + 'X'*(len(first)-1)
    masked_last = last[0] + 'X'*(len(last)-1)
    if len(parts) > 2:
        middle_mask = ' '.join(['X'*len(p) for p in parts[1:-1]])
        return masked_first + ' ' + middle_mask + ' ' + masked_last
    else:
        return masked_first + ' ' + masked_last

def mask_email(email):
    parts = email.split('@')
    username = parts[0]
    if len(username) <= 2:
        masked_user = 'XX'
    else:
        masked_user = username[:2] + 'XXX' + username[-1]
    return masked_user + '@' + parts[1]

def mask_address(address):
    return re.sub(r'\d', 'X', address)

def mask_ip(ip):
    parts = ip.split('.')
    if len(parts) == 4:
        return '.'.join(parts[:2] + ['XXX', 'XXX'])
    else:
        return 'XXX.XXX.XXX.XXX'

def check_standalone_pii(record):
    pii_found = {}
    phone = record.get('phone')
    if phone and PHONE_PATTERN.fullmatch(phone):
        pii_found['phone'] = phone

    aadhar = record.get('aadhar')
    if aadhar and AADHAR_PATTERN.fullmatch(aadhar):
        pii_found['aadhar'] = aadhar

    passport = record.get('passport')
    if passport and PASSPORT_PATTERN.fullmatch(passport):
        pii_found['passport'] = passport

    upi_id = record.get('upi_id')
    if upi_id and UPI_PATTERN.fullmatch(upi_id):
        pii_found['upi_id'] = upi_id

    return pii_found

def contains_combinatorial_pii(record):
    comb_pii_fields = ['name', 'email', 'address', 'device_id', 'ip_address']
    found = {field: record.get(field) for field in comb_pii_fields if record.get(field)}
    return found if len(found) >= 2 else {}

def redact_record(record):
    pii_detected = False
    redacted = record.copy()

    standalone = check_standalone_pii(record)
    for key, value in standalone.items():
        pii_detected = True
        if key == 'phone':
            redacted[key] = mask_phone(value)
        elif key == 'aadhar':
            redacted[key] = mask_aadhar(value)
        elif key == 'passport':
            redacted[key] = mask_passport(value)
        elif key == 'upi_id':
            redacted[key] = mask_upi(value)

    combo = contains_combinatorial_pii(record)
    if combo:
        pii_detected = True
        if 'name' in combo:
            redacted['name'] = mask_name(combo['name'])
        if 'email' in combo:
            redacted['email'] = mask_email(combo['email'])
        if 'address' in combo:
            redacted['address'] = mask_address(combo['address'])
        if 'ip_address' in combo:
            redacted['ip_address'] = mask_ip(combo['ip_address'])
        if 'device_id' in combo:
            device_id_val = combo['device_id']
            redacted['device_id'] = 'X'*len(device_id_val)

    return redacted, pii_detected

def run_redaction(input_csv='iscp_pii_dataset.csv', output_csv='iscp_pii_dataset_redacted_output.csv'):
    with open(input_csv, mode='r', encoding='utf-8') as f_in, \
         open(output_csv, mode='w', encoding='utf-8', newline='') as f_out:
        reader = csv.DictReader(f_in)
        writer = csv.DictWriter(f_out, fieldnames=['record_id', 'redacted_data_json', 'is_pii'])
        writer.writeheader()

        for row in reader:
            record_id = row['record_id']
            data_json_str = row['data_json'].replace('""', '"') # Unescape double quotes
            data = json.loads(data_json_str)
            redacted_data, pii_flag = redact_record(data)
            writer.writerow({
                'record_id': record_id,
                'redacted_data_json': json.dumps(redacted_data),
                'is_pii': pii_flag
            })

    print(f"Redaction complete. Output saved to {output_csv}")

In [29]:
with open('iscp_pii_dataset.csv', 'r', encoding='utf-8') as f:
    header = f.readline().strip()
    print("CSV Headers:", header.split(','))

CSV Headers: ['record_id', 'data_json']


In [30]:
import csv
with open('iscp_pii_dataset.csv', 'r', encoding='utf-8-sig') as f:
    reader = csv.DictReader(f)
    print(reader.fieldnames)  # prints list of column names

['record_id', 'data_json']


In [31]:
with open('iscp_pii_dataset.csv', 'r', encoding='utf-8-sig') as f:
    reader = csv.DictReader(f)
    print("Columns found:", reader.fieldnames)

Columns found: ['record_id', 'data_json']


In [35]:
import csv
import json

def print_data_json_column(input_csv='iscp_pii_dataset.csv'):
    with open(input_csv, mode='r', encoding='utf-8') as f_in:
        reader = csv.DictReader(f_in)
        print("Content of 'data_json' column:")
        for i, row in enumerate(reader):
            if 'data_json' in row:
                print(f"Row {i+1}: {row['data_json']}")
            else:
                print(f"Row {i+1}: 'data_json' column not found in this row.")

print_data_json_column()

Content of 'data_json' column:
Row 1: {"customer_id": "CUST001", "phone": "9876543210", "order_value": 1299}
Row 2: {"name": "Rajesh Kumar", "email": "rajesh.kumar@email.com", "city": "Mumbai"}
Row 3: {"first_name": "Priya", "product": "iPhone 14", "category": "Electronics"}
Row 4: {"aadhar": "123456789012", "transaction_type": "purchase"}
Row 5: {"email": "standalone@email.com", "product_id": "PROD123"}
Row 6: {"name": "Amit Singh", "address": "123 MG Road, Bangalore, 560001", "ip_address": "192.168.1.100"}
Row 7: {"city": "Delhi", "pin_code": "110001", "product_category": "Fashion"}
Row 8: {"upi_id": "user123@paytm", "amount": 500}
Row 9: {"last_name": "Sharma", "order_id": "ORD789456"}
Row 10: {"passport": "P1234567", "booking_reference": "BK123"}
Row 11: {"device_id": "DEV456789", "app_version": "2.1.4"}
Row 12: {"name": "Sneha Patel", "email": "sneha.patel@gmail.com", "device_id": "MOB123456"}
Row 13: {"transaction_id": "TXN987654", "amount": 2500, "status": "completed"}
Row 14: {

In [37]:
# Cell 5: Display first 5 rows of redacted output CSV
import pandas as pd
df = pd.read_csv('iscp_pii_dataset_redacted_output.csv')
df.head(500)

Unnamed: 0,record_id,redacted_data_json,is_pii
0,1,"{""customer_id"": ""CUST001"", ""phone"": ""98XXXXXX1...",True
1,2,"{""name"": ""RXXXXX KXXXX"", ""email"": ""raXXXr@emai...",True
2,3,"{""first_name"": ""Priya"", ""product"": ""iPhone 14""...",False
3,4,"{""aadhar"": ""1234XXXXXXXX123456789012"", ""transa...",True
4,5,"{""email"": ""standalone@email.com"", ""product_id""...",False
...,...,...,...
166,167,"{""monitoring_alert"": ""high_memory_usage"", ""thr...",False
167,168,"{""name"": ""VXXXXX AXXXXXX"", ""address"": ""XXX Com...",True
168,169,"{""load_balancer"": ""nginx"", ""upstream_servers"": 4}",False
169,170,"{""passport"": ""FXXXXXXX"", ""travel_history"": ""US...",True


In [25]:
# Cell 6: Download the redacted output file
from google.colab import files
files.download('iscp_pii_dataset_redacted_output.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>