<a href="https://colab.research.google.com/github/TOM-BOHN/SFDC-User-Permissions-AI/blob/main/Notebooks/SFDC_User_Permission_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Setup

Install the Python SDK.

In [2]:
!git clone https://github.com/TOM-BOHN/SFDC-User-Permissions-AI.git

Cloning into 'SFDC-User-Permissions-AI'...
remote: Enumerating objects: 53, done.[K
remote: Counting objects: 100% (53/53), done.[K
remote: Compressing objects: 100% (43/43), done.[K
remote: Total 53 (delta 23), reused 25 (delta 7), pack-reused 0 (from 0)[K
Receiving objects: 100% (53/53), 22.16 KiB | 597.00 KiB/s, done.
Resolving deltas: 100% (23/23), done.


In [3]:
!pip install -Uq "google-genai==1.7.0"

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/144.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m144.7/144.7 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [64]:
from google import genai
from google.genai import types

from IPython.display import Markdown, display

genai.__version__

###################################

import pandas as pd
import enum
import json
import time
import os

### Set up your API key

To run the following cell, your API key must be stored it in a [Kaggle secret](https://www.kaggle.com/discussions/product-feedback/114053) named `GOOGLE_API_KEY`.

If you don't already have an API key, you can grab one from [AI Studio](https://aistudio.google.com/app/apikey). You can find [detailed instructions in the docs](https://ai.google.dev/gemini-api/docs/api-key).

To make the key available through Kaggle secrets, choose `Secrets` from the `Add-ons` menu and follow the instructions to add your key or enable it for this notebook.

In [5]:
from google.colab import userdata

client = genai.Client(api_key=userdata.get('GOOGLE_API_KEY'))

### Automated retry

This codelab sends a lot of requests, so set up an automatic retry
that ensures your requests are retried when per-minute quota is reached.

In [6]:
from google.api_core import retry

is_retriable = lambda e: (isinstance(e, genai.errors.APIError) and e.code in {429, 503})

if not hasattr(genai.models.Models.generate_content, '__wrapped__'):
  genai.models.Models.generate_content = retry.Retry(
      predicate=is_retriable)(genai.models.Models.generate_content)

In [51]:
url = "https://raw.githubusercontent.com/TOM-BOHN/SFDC-User-Permissions-AI/refs/heads/main/Inputs/User_Permission_Reference_Data__Sample.csv"
perm_list_df = pd.read_csv(url)
perm_list_df.head()

Unnamed: 0,Permission Name,API Name,Description
0,Access Data Cloud Data Explorer,AccessCdpDataExplorer,Allows user access Data Cloud Data Explorer.
1,Administer territory operations,ManageTerritories,Prerequisite user permission for a user to man...
2,Allow sending of List Emails,ListEmailSend,"Allow users to create, edit and send List Emails"
3,Api Only User,ApiUserOnly,Access Salesforce.com only through a Salesforc...
4,Author Apex,AuthorApex,Create Apex classes and triggers.


In [8]:
with open('/content/SFDC-User-Permissions-AI/Prompts/prompt_user_perm_risk_rating.md', 'r') as f:
    PROMPT_USER_PERM_RISK_RATING = f.read()

print(PROMPT_USER_PERM_RISK_RATING)

# Permission Risk Evaluation Prompt Template  
# --------------------------------------------------
# This template can be imported and formatted with the specific
# `permission_name` and `permission_description` variables to create
# a concrete evaluation prompt for any Salesforce permission.
# --------------------------------------------------

PERMISSION_RISK_PROMPT = """
# Instruction
You are a **Salesforce security risk assessor**.
Your task is to evaluate the **inherent risk level** of a Salesforce permission (or capability) when granted to a user.
We will provide you with the permission name and a short description of what it allows.
Analyze the permission against the **Evaluation Criteria** below and assign one of the five **Risk Levels** defined in the Rating Rubric.
Give step‑by‑step reasoning for your decision, citing the specific criteria that most influenced your rating.

# Evaluation

## Metric Definition
**Permission Risk** [aka weighted_score] measures the potential neg

In [52]:
# Define a structured enum class to capture the result.
class RiskRating(enum.Enum):
  MISSION_CRITICAL = '5'
  RESTRICTED       = '4'
  SENSITIVE        = '3'
  CONTROLLED       = '2'
  GENERAL          = '1'

def eval_summary(PROMPT, name, api_name, description):
  """Evaluate the generated summary against the prompt used."""

  chat = client.chats.create(model='gemini-2.0-flash')

  # Generate the full text response.
  response = chat.send_message(
      message=PROMPT.format(
          permission_name = name
        , permission_description = description
      )
  )
  verbose_eval = response.text

  # Coerce into the desired structure.
  structured_output_config = types.GenerateContentConfig(
      response_mime_type="text/x.enum",
      response_schema=RiskRating,
  )
  response = chat.send_message(
      message="Convert the final score.",
      config=structured_output_config,
  )
  structured_eval = response.parsed

  return verbose_eval, structured_eval


text_eval, struct_eval = eval_summary(
    PROMPT      = PROMPT_USER_PERM_RISK_RATING
  , name        = perm_list_df['Permission Name'][0]
  , api_name    = perm_list_df['API Name'][0]
  , description = perm_list_df['Description'][0]
)

Markdown(text_eval)

```json
{
  "risk_tier": "Sensitive",
  "risk_rating": "3",
  "weighted_score": 3.0,
  "scores": {
    "Data_Sensitivity": 3,
    "Scope_of_Impact": 3,
    "Configurational_Authority": 2,
    "External_Data_Exposure": 2,
    "Regulatory_Obligation": 3,
    "Segregation_of_Duties": 3,
    "Auditability": 4,
    "Reversibility": 4
  },
  "rationale": "Access to Data Cloud Data Explorer allows users to view and potentially manipulate data from various sources, including potentially sensitive customer data. The scope of impact is limited to Data Cloud, but data sensitivity and potential regulatory obligations warrant a Sensitive risk level. Auditability and reversibility are reasonably good, lowering the overall risk slightly.",
  "confidence": "High"
}
```

In [53]:
struct_eval

<RiskRating.SENSITIVE: '3'>

In [56]:
def classify_risk_rating(input_df, prompt, total_records = None, checkin_interval = 60, debug = True):

  # If total records is None, then process all the records in the dataframew
  if total_records is None:
    total_records = len(input_df)

  # Create a results dataframe for output
  results_df = pd.DataFrame(columns=['Permission Name', 'API Name', 'Description', 'Risk Rating', 'Evaluation'])

  # Start tracking time
  start_time = time.time()
  last_checkin = start_time
  print(f"Starting job to process {total_records} records.")
  print('####################\n')

  # Iteratively prompt and get results
  for i in range(total_records):

    # Checkin on Progress of Job
    current_time = time.time()
    if current_time - last_checkin >= checkin_interval:
      print(f"Progress checkin: {current_time - start_time:.2f} seconds elapsed. Completed {i+1} of {total_records}.")
      last_checkin = current_time

    # Print the results of the processing
    if debug == True:
      print(f'Analyzing Permission {i+1} of {total_records}...')
      print('Name:       ', input_df['Permission Name'][i])
      print('API Name:   ', input_df['API Name'][i])
      print('Description:', input_df['Description'][i])
      print('--------------------')

    # Substitute variables into the prompt
    text_eval, struct_eval = eval_summary(
          PROMPT      = prompt
        , name        = input_df['Permission Name'][0]
        , api_name    = input_df['API Name'][0]
        , description = input_df['Description'][0]
      )

    # Print the results of the processing
    if debug == True:
      print('Risk Rating:', struct_eval)
      print('####################')
      print()

    # Append the results to the DataFrame
    results_df.loc[len(results_df)] = [
          input_df['Permission Name'][i]
        , input_df['API Name'][i]
        , input_df['Description'][i]
        , struct_eval   #Risk Rating
        , text_eval     #Evaluation
    ]

  # Close the loop by reporting on time taken
  end_time = time.time()
  total_time = end_time - start_time
  print('\n####################')
  print(f"Total time taken: {total_time:.2f} seconds to process {total_records} records.")

  # Print a sample of the output table
  if debug == True:
    print('Sample Output of Resultes:')
    display(results_df.head(5))
    print()

  return results_df

In [57]:
results_df = classify_risk_rating(
      input_df = perm_list_df
    , prompt = PROMPT_USER_PERM_RISK_RATING
    , total_records = 2
    , checkin_interval = 60
    , debug = True
  )
results_df

Starting job to process 2 records.
####################

Analyzing Permission 1 of 2...
Name:        Access Data Cloud Data Explorer
API Name:    AccessCdpDataExplorer
Description: Allows user access Data Cloud Data Explorer.
--------------------
Risk Rating: RiskRating.SENSITIVE
####################

Analyzing Permission 2 of 2...
Name:        Administer territory operations
API Name:    ManageTerritories
Description: Prerequisite user permission for a user to manage a territory branch.
--------------------
Risk Rating: RiskRating.SENSITIVE
####################


####################
Total time taken: 5.79 seconds to process 2 records.
Sample Output of Resultes:


Unnamed: 0,Permission Name,API Name,Description,Risk Rating,Evaluation
0,Access Data Cloud Data Explorer,AccessCdpDataExplorer,Allows user access Data Cloud Data Explorer.,RiskRating.SENSITIVE,"```json\n{\n ""risk_tier"": ""Sensitive"",\n ""ri..."
1,Administer territory operations,ManageTerritories,Prerequisite user permission for a user to man...,RiskRating.SENSITIVE,"```json\n{\n ""risk_tier"": ""Sensitive"",\n ""ri..."





Unnamed: 0,Permission Name,API Name,Description,Risk Rating,Evaluation
0,Access Data Cloud Data Explorer,AccessCdpDataExplorer,Allows user access Data Cloud Data Explorer.,RiskRating.SENSITIVE,"```json\n{\n ""risk_tier"": ""Sensitive"",\n ""ri..."
1,Administer territory operations,ManageTerritories,Prerequisite user permission for a user to man...,RiskRating.SENSITIVE,"```json\n{\n ""risk_tier"": ""Sensitive"",\n ""ri..."


In [58]:
def extract_json_fields(results_df, json_column='Evaluation',  debug = True):
    """
    Extracts fields from a JSON column in a DataFrame and adds them as new columns.

    Args:
        results_df: The input DataFrame containing the JSON column.
        json_column: The name of the column containing the JSON data (default: 'Evaluation').

    Returns:
        The modified DataFrame with extracted fields as new columns.
    """

    # Create new columns for extracted fields
    new_columns = ['Risk Tier', 'Risk Rating', 'Weighted Score', 'Scores', 'Rationale', 'Confidence']
    for col in new_columns:
        if col not in results_df.columns:
          results_df[col] = None

    # Cleans the JSON field in a DataFrame to comply with JSON formatting standards.
    def clean_json_string(json_string):
        """Cleans a single JSON string."""
        json_string = json_string.replace("```json\n", "")
        json_string = json_string.replace("\n```", "")
        json_string = json_string.replace("\n", "")
        json_string = json_string.replace("```", "")
        return json_string

    # Apply the JSON cleaning to the JSON column
    results_df[json_column] = results_df[json_column].apply(clean_json_string)

    for index, row in results_df.iterrows():
        try:
            eval_data = json.loads(row[json_column])

            # Extract and assign values to new columns
            results_df.loc[index, 'Risk Tier'] = eval_data.get('risk_tier', '')
            results_df.loc[index, 'Risk Rating'] = eval_data.get('risk_rating', '')
            results_df.loc[index, 'Weighted Score'] = eval_data.get('weighted_score', '')
            results_df.loc[index, 'Confidence'] = eval_data.get('confidence', '')
            results_df.loc[index, 'Scores'] = str(eval_data.get('scores', ''))
            results_df.loc[index, 'Rationale'] = eval_data.get('rationale', '')

        except json.JSONDecodeError:
            print(f"Error decoding JSON for record at index {index}. Skipping row.")

    # Print a sample of the output table
    if debug == True:
      print('Sample Output of Resultes:')
      display(results_df.head(5))
      print()


    return results_df

In [59]:
full_results_df = extract_json_fields(
    results_df
  , json_column='Evaluation'
  , debug = True
)

Sample Output of Resultes:


Unnamed: 0,Permission Name,API Name,Description,Risk Rating,Evaluation,Risk Tier,Weighted Score,Scores,Rationale,Confidence
0,Access Data Cloud Data Explorer,AccessCdpDataExplorer,Allows user access Data Cloud Data Explorer.,3,"{ ""risk_tier"": ""Sensitive"", ""risk_rating"": ""...",Sensitive,3.0,"{'Data_Sensitivity': 3, 'Scope_of_Impact': 3, ...",Access to Data Cloud Data Explorer allows user...,Medium
1,Administer territory operations,ManageTerritories,Prerequisite user permission for a user to man...,3,"{ ""risk_tier"": ""Sensitive"", ""risk_rating"": ""...",Sensitive,2.6,"{'Data_Sensitivity': 3, 'Scope_of_Impact': 3, ...",Access to Data Cloud Data Explorer grants the ...,High





In [60]:
results_df['Evaluation'][1]

'{  "risk_tier": "Sensitive",  "risk_rating": "3",  "weighted_score": 2.6,  "scores": {    "Data_Sensitivity": 3,    "Scope_of_Impact": 3,    "Configurational_Authority": 1,    "External_Data_Exposure": 2,    "Regulatory_Obligation": 3,    "Segregation_of_Duties": 2,    "Auditability": 3,    "Reversibility": 3  },  "rationale": "Access to Data Cloud Data Explorer grants the ability to explore data, potentially including sensitive data depending on what\'s stored. The scope of the impact is moderate, affecting the data accessible through Data Cloud. There\'s a moderate risk of violating regulatory obligations depending on the sensitivity of data exposed in the Data Cloud and the auditability of the explorer activities.",  "confidence": "High"}'

In [61]:
results_df.iloc[0].to_dict()

{'Permission Name': 'Access Data Cloud Data Explorer',
 'API Name': 'AccessCdpDataExplorer',
 'Description': 'Allows user access Data Cloud Data Explorer.',
 'Risk Rating': '3',
 'Evaluation': '{  "risk_tier": "Sensitive",  "risk_rating": "3",  "weighted_score": 3.0,  "scores": {    "Data_Sensitivity": 3,    "Scope_of_Impact": 3,    "Configurational_Authority": 2,    "External_Data_Exposure": 3,    "Regulatory_Obligation": 3,    "Segregation_of_Duties": 3,    "Auditability": 3,    "Reversibility": 3  },  "rationale": "Access to Data Cloud Data Explorer allows users to explore data within Data Cloud, potentially exposing sensitive data depending on the data stored. The scope of impact is moderate, limited to data within Data Cloud. While it allows exploration, it\'s not clear if it allows data modification or export, but could still violate regulatory obligations if misused.",  "confidence": "Medium"}',
 'Risk Tier': 'Sensitive',
 'Weighted Score': 3.0,
 'Scores': "{'Data_Sensitivity': 

In [65]:
# Create output directory if it doesn't exist
output_dir = 'output/data'
os.makedirs(output_dir, exist_ok=True)

 # Save the results to a CSV file
output_file = os.path.join(output_dir, 'results.csv')
results_df.to_csv(output_file, index=False)  # index=False to avoid saving the index