### Introduction
This notebook demonstrates how to process a batch of documents for Personally Identifiable Information (PII) detection using Azure's Cognitive Services. It adheres to the rate limits specified for the S / Multi-service tier and processes a single batch of 5 documents to ensure the pipeline works correctly.

### Setup
First, we need to import the necessary libraries.

In [75]:
import os
import time
import pandas as pd
import ast
import requests
import json


### Load the Data
Load the JSON data and the list of test indices.

In [76]:
def read_json(path='data/obfuscated_data_06.json'):
    """
    Reads a JSON file into a pandas DataFrame.
    
    Parameters:
        path (str): Path to the JSON file.
        
    Returns:
        pd.DataFrame: DataFrame containing the JSON data.
    """
    df = pd.read_json(path, orient="records", encoding='utf-8')
    return df

def load_test_indices(path='data/test_indices_2.txt'):
    """
    Loads test indices from a text file.
    
    Parameters:
        path (str): Path to the text file containing test indices.
        
    Returns:
        list: List of test indices.
    """
    with open(path, 'r') as file:
        test_indices = ast.literal_eval(file.read().strip())
    return test_indices

# Load Data
df = read_json()
test_indices_2 = load_test_indices()

print(f"Total Documents Loaded: {len(df)}")
print(f"Total Test Indices: {len(test_indices_2)}")


Total Documents Loaded: 22688
Total Test Indices: 13613


### Send the PII Detection Request
Construct the JSON payload and send a POST request to Azure's PII Detection API.

In [77]:
# Configuration Parameters
with open('input/secret.json', 'r', encoding='UTF-8') as file:
    secret_json = json.load(file)
API_KEY = secret_json['LANGUAGE_KEY']
API_ENDPOINT = secret_json['LANGUAGE_ENDPOINT'] + 'language/analyze-text/jobs'

API_VERSION = "2023-04-15-preview"
SHOW_STATS = "True"

def send_pii_detection_request(batch, df, batch_number):
    """
    Sends a PII detection request for a batch of documents.
    
    Parameters:
        batch (list): List of document indices.
        df (pd.DataFrame): DataFrame containing the documents.
        batch_number (int): The current batch number.
        
    Returns:
        str: Operation location URL if the request is accepted.
    """
    documents = []
    for doc_idx in batch:
        document_id = str(doc_idx)  # Ensure it's a string
        text_content = df.iloc[doc_idx]['full_text']
        
        documents.append({
            "id": document_id,
            "language": "en",
            "text": text_content
        })
    
    # Construct the payload
    payload = {
        "displayName": f"Document PII Detection Task Batch {batch_number}",
        "analysisInput": {
            "documents": documents
        },
        "tasks": [
            {
                "kind": "PiiEntityRecognition",
                "taskName": f"Document PII Detection Task {batch_number}",
                "parameters": {
                    "model-version": "latest",
                    "piiCategories": ["Person", "URL", "Email", "PhoneNumber"]
                }
            }
        ]
    }
    
    headers = {
        "Content-Type": "application/json",
        "Ocp-Apim-Subscription-Key": API_KEY
    }
    
    try:
        response = requests.post(
            f"{API_ENDPOINT}?api-version={API_VERSION}&showStats={SHOW_STATS}",
            headers=headers,
            data=json.dumps(payload)
        )
        
        if response.status_code == 202:
            operation_location = response.headers.get("operation-location")
            print(f"Batch {batch_number}: Request accepted.")
            print(f"Operation Location: {operation_location}")
            return operation_location
        else:
            print(f"Batch {batch_number}: Failed with status code {response.status_code}.")
            print(f"Response: {response.text}")
            return None
    except Exception as e:
        print(f"Batch {batch_number}: Exception occurred - {str(e)}")
        return None


In [95]:
send_error_log = []
receive_error_log = []
job_ids = []
batch_ids = []

In [79]:
from torch.utils.data import Dataset, DataLoader 

class doc_dataset(Dataset): 
    def __init__(self,indices):
        self.indices = indices 
    def __len__(self):
        return len(self.indices)
    def __getitem__(self,idx): 
        return self.indices[idx]

datas = doc_dataset(test_indices_2)
loader = DataLoader(datas, batch_size = 5)

for i, batch in enumerate(loader):
    batch_ids.append(batch)
    if i % 50 == 0 and (i != 0): 
        time.sleep(3)
        if i % 250 == 0 and (i != 0): 
            time.sleep(90)
    print(batch.tolist())
    batch = batch.tolist()
    operation_location = send_pii_detection_request(batch, df, batch_number=i)
    if operation_location == None:
        send_error_log.append(batch)
        job_ids.append('NONE')
        print(f"batch with file indices {batch}, failed to load")
    else: 
        with open('output/operation_locations.txt','a') as file: 
            file.write(f'{operation_location}\n')
        job_ids.append(operation_location)

# runtime: 33m 35.3s

[2, 5, 6, 7, 9]
Batch 0: Request accepted.
Operation Location: https://azure-pii-detection.cognitiveservices.azure.com/language/analyze-text/jobs/1eb6f2f1-5f5d-49d2-8cad-9ef5a0bf7116?api-version=2023-04-15-preview
[10, 11, 12, 13, 14]
Batch 1: Request accepted.
Operation Location: https://azure-pii-detection.cognitiveservices.azure.com/language/analyze-text/jobs/1151c083-5ed7-4677-af3c-21f8754c5524?api-version=2023-04-15-preview
[16, 17, 18, 19, 21]
Batch 2: Request accepted.
Operation Location: https://azure-pii-detection.cognitiveservices.azure.com/language/analyze-text/jobs/5bfbd9e8-a5b4-481b-bc29-b4910bb3b505?api-version=2023-04-15-preview
[22, 24, 25, 26, 27]
Batch 3: Request accepted.
Operation Location: https://azure-pii-detection.cognitiveservices.azure.com/language/analyze-text/jobs/66ad6d9e-a9ee-421a-8375-5f0f9b26dbb3?api-version=2023-04-15-preview
[28, 30, 31, 32, 35]
Batch 4: Request accepted.
Operation Location: https://azure-pii-detection.cognitiveservices.azure.com/langu

In [102]:
print(send_error_log)
print(receive_error_log)
print(len(job_ids))
print(len(batch_ids))

[]
[]
2723
2723


### Monitor Job Status
Poll the operation location until the job status is succeeded.

In [91]:
def check_job_status(operation_location, api_key, poll_interval=10, timeout=300):
    """
    Polls the job status until it succeeds or fails.
    
    Parameters:
        operation_location (str): The operation location URL.
        api_key (str): Azure subscription key.
        poll_interval (int): Time to wait between polls (in seconds).
        timeout (int): Maximum time to wait (in seconds).
        
    Returns:
        dict: JSON response of the job status.
    """
    headers = {
        "Ocp-Apim-Subscription-Key": api_key
    }
    
    start_time = time.time()
    
    while True:
        try:
            response = requests.get(operation_location, headers=headers)
            if response.status_code == 200:
                job_status = response.json().get("status")
                print(f"Job Status: {job_status}")
                
                if job_status == "succeeded":
                    # print("Job succeeded.")
                    return response.json()
                elif job_status == "failed":
                    print("Job failed.")
                    return response.json()
                else:
                    print(f"Job is still in progress. Waiting for {poll_interval} seconds before next check.")
            else:
                print(f"Failed to get job status. Status Code: {response.status_code}")
                print(f"Response: {response.text}")
                return None
        except Exception as e:
            print(f"Exception occurred while checking job status: {str(e)}")
            return None
        
        # Check for timeout
        elapsed_time = time.time() - start_time
        if elapsed_time > timeout:
            print("Polling timed out.")
            return None
        
        time.sleep(poll_interval)


In [89]:
'NONE' in job_ids

False

In [92]:
# Process all test files
for idx, operation_location in enumerate(job_ids):
    if idx % 100 == 0:
        print(f"Processing {idx} ----------")
    if operation_location != 'NONE':
        job_response = check_job_status(operation_location, API_KEY)
        job_status = job_response["status"]
        if (job_status == "failed"): 
            receive_error_log.append(operation_location)


Processing 0 ----------
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Status: succeeded
Job Stat

In [93]:
print(receive_error_log)

[]


### Retrieve and Process Results
Once the job succeeds, retrieve the results and process them into a structured format.

In [58]:
def retrieve_job_results(operation_location, api_key):
    """
    Retrieves the results of a PII detection job.
    
    Parameters:
        operation_location (str): The operation location URL.
        api_key (str): Azure subscription key.
        
    Returns:
        dict: JSON response containing the job results.
    """
    headers = {
        "Ocp-Apim-Subscription-Key": api_key
    }
    
    try:
        response = requests.get(operation_location, headers=headers)
        if response.status_code == 200:
            print("Successfully retrieved job results.")
            return response.json()
        else:
            print(f"Failed to retrieve job results. Status Code: {response.status_code}")
            print(f"Response: {response.text}")
            return None
    except Exception as e:
        print(f"Exception occurred while retrieving job results: {str(e)}")
        return None

def process_results(job_results):
    """
    Processes the job results and structures them into a DataFrame.
    
    Parameters:
        job_results (dict): JSON response from the job results.
        df (pd.DataFrame): DataFrame containing the documents.
        test_indices_batch (list): List of document indices in the batch.
        
    Returns:
        pd.DataFrame: DataFrame with columns [file_idx, entity_text, type, positions].
    """
    processed_data = []
    
    tasks = job_results.get("tasks", {})
    items = tasks.get("items", [])
    
    for task in items:
        results = task.get("results", {})
        documents = results.get("documents", [])
        
        for doc in documents:
            doc_id = int(doc.get("id"))
            entities = doc.get("entities", [])
            
            # Find the original file index from the test_indices_batch
            # Assuming 'id' corresponds to df.iloc[idx]['id']
            # original_idx = df[df['id'].astype(str) == doc_id].index.tolist()
            # if original_idx:
            #     file_idx = original_idx[0]
            # else:
            #     file_idx = None  # Or handle as needed
            
            for entity in entities:
                entity_text = entity.get("text")
                entity_type = entity.get("category")
                offset = entity.get("offset")
                length = entity.get("length")
                positions = (offset, offset + length)
                # positions = f"{offset}-{offset + length}"
                
                processed_data.append({
                    "file_idx": doc_id,
                    "entity_text": entity_text,
                    "type": entity_type,
                    "positions": positions
                })
    
    result_df = pd.DataFrame(processed_data)
    return result_df


In [104]:
for idx, operation_location in enumerate(job_ids):
    if idx % 100 == 0:
        print(f"Processing {idx} ----------")
    if operation_location != 'NONE':
        job_response = retrieve_job_results(operation_location, API_KEY)
        if job_response == None:
            print('job_response error')

        batch = batch_ids[idx]
        res_df = process_results(job_response)
        if not os.path.exists('output/pii_azure_detected.csv'):
            res_df.to_csv('output/pii_azure_detected.csv', index=False)
            continue
        existing_df = pd.read_csv('output/pii_azure_detected.csv')
        combined_df = pd.concat([existing_df, res_df], axis=0)
        combined_df.to_csv('output/pii_azure_detected.csv', index=False)
    

Processing 0 ----------
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Successfully retrieved job results.
Succ

### Save Results to CSV
Export the processed PII entities to a CSV file.

In [25]:
# # Define output CSV path
# output_csv_path = 'output/pii_azure_detected_sample.csv'

# # Save to CSV
# pii_df.to_csv(output_csv_path, index=False)

# print(f"PII detection results saved to '{output_csv_path}'.")


PII detection results saved to 'output/pii_azure_detected_sample.csv'.


In [3]:
# # Configuration Parameters
# with open('input/secret.json', 'r', encoding='UTF-8') as file:
#     secret_json = json.load(file)
# API_KEY = secret_json['LANGUAGE_KEY']
# API_ENDPOINT = secret_json['LANGUAGE_ENDPOINT'] + 'language/analyze-text/jobs'
# # ?api-version=2023-04-15-preview'

# # API_ENDPOINT = "https://azure-pii-detection.cognitiveservices.azure.com/language/analyze-text/jobs"
# API_VERSION = "2023-04-15-preview"
# SHOW_STATS = "True"

# # Rate Limits for S0/F0 Tier
# MAX_REQUESTS_PER_MINUTE = 300
# MAX_REQUESTS_PER_SECOND = 100  # Note: For S0/F0, primary limit is per minute

# # Documents per Request
# DOCS_PER_REQUEST = 5

# # Delay Parameters
# SECONDS_PER_MINUTE = 60
# REQUESTS_PER_BATCH = 5  # Number of requests to send per batch to stay within rate limits
# DELAY_BETWEEN_BATCHES = SECONDS_PER_MINUTE / (MAX_REQUESTS_PER_MINUTE / REQUESTS_PER_BATCH)  # e.g., 60 / (300/5) = 1 second

In [4]:
# def read_json(path='data/obfuscated_data_06.json'):
#     df = pd.read_json(path, orient="records", encoding='utf-8')
#     return df

# def load_test_indices(path='data/test_indices_2.txt'):
#     with open(path, 'r') as file:
#         test_indices = ast.literal_eval(file.read().strip())
#     return test_indices

# # Load Data
# df = read_json()
# test_indices_2 = load_test_indices()


In [None]:
# # Group the documents into batches of 5, as per the API's maximum documents per request.
# def create_batches(test_indices, batch_size=DOCS_PER_REQUEST):
#     """Yield successive batches of test indices."""
#     for i in range(0, len(test_indices), batch_size):
#         yield test_indices[i:i + batch_size]


In [None]:
# def send_pii_detection_request(batch, df, batch_number):
#     """Send a PII detection request for a batch of documents."""
#     documents = []
#     for doc_idx in batch:
#         # Assuming 'id' is a unique identifier in your DataFrame
#         document_id = str(df.iloc[doc_idx]['id'])  # Ensure it's a string
#         text_content = df.iloc[doc_idx]['full_text']
        
#         documents.append({
#             "id": document_id,
#             "language": "en",
#             "text": text_content
#         })
    
#     # Construct the payload
#     payload = {
#         "displayName": f"Document PII Detection Task Batch {batch_number}",
#         "analysisInput": {
#             "documents": documents
#         },
#         "tasks": [
#             {
#                 "kind": "PiiEntityRecognition",
#                 "taskName": f"Document PII Detection Task {batch_number}",
#                 "parameters": {
#                     "model-version": "latest",
#                     "piiCategories": ["Person", "URL", "Email", "PhoneNumber"]
#                 }
#             }
#         ]
#     }
    
#     headers = {
#         "Content-Type": "application/json",
#         "Ocp-Apim-Subscription-Key": API_KEY
#     }
    
#     try:
#         response = requests.post(
#             f"{API_ENDPOINT}?api-version={API_VERSION}&showStats={SHOW_STATS}",
#             headers=headers,
#             data=json.dumps(payload)
#         )
        
#         if response.status_code == 202:
#             operation_location = response.headers.get("operation-location")
#             print(f"Batch {batch_number}: Request accepted. Operation Location: {operation_location}")
#             return operation_location
#         else:
#             print(f"Batch {batch_number}: Failed with status code {response.status_code}. Response: {response.text}")
#             return None
#     except Exception as e:
#         print(f"Batch {batch_number}: Exception occurred - {str(e)}")
#         return None
