In [None]:
%pip -q install google-generativeai tqdm ujson pyyaml seaborn matplotlib

In [None]:

# 1) Imports + API key

import os, glob, zipfile, re, unicodedata, ujson, json, yaml
import pandas as pd
from tqdm import tqdm
from typing import List, Dict, Any
from IPython.display import display

import matplotlib.pyplot as plt
import seaborn as sns

import google.generativeai as genai

# ---- Paste your Gemini API key here ----
genai.configure(api_key="...")


In [None]:

# 2) ZIP path (local file) + extract

import shutil

# ---- Set the path to your ZIP on disk (relative or absolute) ----
ZIP_PATH = r"C:\Users\Sanglap\Desktop\Prodigal Assignment\All_Conversations.zip"  # e.g., "./transcripts.zip"
if not ZIP_PATH or not ZIP_PATH.lower().endswith(".zip"):
    raise SystemExit(f"Please set ZIP_PATH to a .zip file (got: {ZIP_PATH})")
if not os.path.exists(ZIP_PATH):
    raise SystemExit(f"ZIP not found at: {ZIP_PATH}")

EXTRACT_DIR = "./data"
OUT_DIR = "./outputs"
os.makedirs(OUT_DIR, exist_ok=True)

OUT_JSONL = os.path.join(OUT_DIR, "gemini_analysis.jsonl")
OUT_SUMMARY_CSV = os.path.join(OUT_DIR, "summary.csv")

# Clean and extract
if os.path.exists(EXTRACT_DIR):
    shutil.rmtree(EXTRACT_DIR)
os.makedirs(EXTRACT_DIR, exist_ok=True)

with zipfile.ZipFile(ZIP_PATH, "r") as zf:
    zf.extractall(EXTRACT_DIR)

# Recursively find JSON/YAML files (skip noise dirs)
EXCLUDED_DIRS = {".ipynb_checkpoints", "__MACOSX", ".git", ".svn"}
files = []
for dirpath, dirnames, filenames in os.walk(EXTRACT_DIR):
    dirnames[:] = [d for d in dirnames if d not in EXCLUDED_DIRS and not d.startswith(".")]
    for fn in filenames:
        if fn.lower().endswith((".json", ".yaml", ".yml")) and not fn.startswith("."):
            files.append(os.path.join(dirpath, fn))

files = sorted(files)
print(f"Extracted to: {EXTRACT_DIR}")
print(f"Discovered {len(files)} transcript file(s). Showing up to 10:")
for p in files[:10]:
    print(" -", p)
if not files:
    raise SystemExit("No .json/.yaml/.yml files found inside the ZIP.")


In [None]:

# 3) YOUR GEMINI MODEL CODE 


# --- Set up Gemini API ---
# Re-configure Gemini API in case of kernel reset
try:
    genai.get_model('gemini-1.5-flash-latest')
except Exception:
    print("Configuring Gemini API...")
    # IMPORTANT: Replace with your actual API key or retrieve it securely
    genai.configure(api_key='....')

# --- Define Gemini Prompt ---
# Using the prompt defined that includes profanity instructions
GEMINI_PII_SYSTEM_PROMPT_WITH_PROFANITY = """You are an expert compliance and conversation analysis assistant, specializing in precise identification of profanity and sensitive information disclosure within customer service transcripts. Your analysis must be strictly objective, based solely on the provided text, and adhere to the defined rules and output format.

*INPUT FORMAT:*
You will receive a conversation transcript as a sequence of indexed utterances. Each utterance is formatted as:
[index] SPEAKER: text

- [index]: A 0-based integer representing the utterance's position in the original transcript. *Crucially, these indices must be preserved exactly in your output.*
- SPEAKER: Will always be one of two predefined roles: AGENT or CUSTOMER.
- text: The spoken content of the utterance.

*PRIMARY OBJECTIVES:*
Your task is to generate an analysis that provides the following two key checks:
1.  *Profanity Detection:* Identify all instances of profanity, noting the exact offending phrase, the speaker, and the utterance index.
2.  *Privacy/Compliance Audit (PII Disclosure):* Determine if the AGENT disclosed any sensitive account information before the CUSTOMER's identity was successfully verified.

*KEY DEFINITIONS (Strict Interpretation Required):*

A.  *PROFANITY:*
    -   *Definition:* Any explicit offensive, vulgar, or abusive terms. This includes, but is not limited to, common curse words (e.g., f*ck, sh*t, a**hole, b*tch, damn, hell, crap, ass), and their common variants or obfuscated forms (e.g., f@#k, f u c k, ffffuu…, sh!t, a$$hole). Slur variants are also included.
    -   *Detection Rule:* You must identify and extract the exact substring as it appears in the utterance text. Do not normalize, infer, or invent text that is not explicitly present.
    -   *Output Format for Profanity:* Each detected instance must be represented as an object with speaker, text (the exact offending phrase), and index (the utterance index). Collect all instances into a list.

B.  *IDENTITY VERIFICATION (“verification”):*
    -   *Definition:* The process by which the AGENT confirms the CUSTOMER's identity using specific Personally Identifiable Information (PII).
    -   *Acceptable Verification Signals (PII):*
        -   *Date of Birth (DOB):* Full date of birth (e.g., MM/DD/YYYY) or components (e.g., month, day, year) that, when combined, constitute a full DOB.
        -   *Full Address:* Complete street address including street number, street name, city, state, and ZIP code, or a combination of street name and ZIP code.
        -   *Social Security Number (SSN):* Full SSN or the last four digits (last-4) of the SSN. This also applies to equivalent government IDs.
    -   *Verification Event Completion:* A verification event is considered successfully completed only when both conditions are met:
        1.  The AGENT explicitly requests one or more of the acceptable PII signals.
        2.  The CUSTOMER provides or confirms the correct information in response to the AGENT's request.
        3.  Alternatively, the AGENT explicitly states that verification was successful (e.g., “Okay, I’ve verified your identity.”).
    -   *What is NOT Verification:*
        -   Mere confirmation of name, phone number, or email address.
        -   Caller ID information.
        -   Generic security questions that do not involve specific PII (e.g., “Can you confirm your account?” without subsequent PII exchange).
        -   Partial verification where the CUSTOMER refuses to provide the requested PII (e.g., AGENT asks for last-4 SSN, but CUSTOMER declines).

C.  *SENSITIVE DISCLOSURE (“disclosure”):*
    -   *Definition:* The AGENT revealing specific, private account or balance details to the CUSTOMER.
    -   *Examples of Sensitive Information:*
        -   Current balance, amount due, or amount owed.
        -   Exact dollar amounts related to the account (e.g., “Your balance is $X.XX”).
        -   Full or partial account numbers (e.g., last-4 digits of an account number).
        -   Payment due dates.
        -   Last payment amount or date.
        -   Any other data that explicitly confirms the CUSTOMER’s private account state or transaction history.
    -   *What is NOT Disclosure:*
        -   Generic policy statements or disclaimers (e.g., “We cannot discuss your account until we verify your identity.”).
        -   Salutations, on-hold notices, or filler words, unless they contain sensitive information.
    -   *Important Edge Case:* If the CUSTOMER states a sensitive detail (e.g., an amount) first, and the AGENT then confirms or corrects that detail before successful identity verification, this still counts as an AGENT disclosure.

**DECISION RULES FOR pii_disclosure_without_verification FLAG:**
This boolean flag (true or false) indicates a compliance violation.

-   **Set to true IF:** Any AGENT disclosure (as defined in C) occurs before the first successful identity verification event (as defined in B) in the transcript.
-   **Set to false IF:**
    -   Successful identity verification occurs before any AGENT disclosure.
    -   There are AGENT disclosures, but they all occur after the first successful verification.
    -   There is no AGENT disclosure at all in the transcript.
-   *Ambiguity Resolution:* If there is any uncertainty or lack of explicit evidence in the transcript to confirm a violation, you must default to false. Provide a brief rationale citing the relevant indices for your decision.

*OUTPUT FORMAT (Structured Text):*
Provide your analysis in the following structured format. Do not include any other commentary or conversational text.

Profanity instances: [list of profanity objects, e.g., [{"speaker": "CUSTOMER", "text": "f*ck", "index": 1}, ...]]
PII disclosure before verification: [true/false]
Rationale: [brief explanation, citing relevant utterance indices]
Verification indices: [list of 0-based utterance indices, e.g., [3, 4]]
Disclosure indices: [list of 0-based utterance indices, e.g., [2]]

*FINAL INSTRUCTION:*
Analyze the provided transcript and provide your analysis ONLY in the specified format.
"""

# --- Helper function to parse raw text output from Gemini ---
def analyze_transcript_gemini_with_profanity_raw_text(gemini_output_text: str) -> Dict[str, Any]:
    result = {
        'profanity': [],
        'pii_disclosure_without_verification': False,
        'rationale': 'Parsing failed or information not found.',
        'checked_verification_indices': [],
        'disclosure_indices': []
    }

    if not gemini_output_text:
        return result

    text_lower = gemini_output_text.lower()

    # Profanity instances
    profanity_match = re.search(r"profanity instances:\s*(\[.*?\])", text_lower, re.DOTALL)
    if profanity_match:
        profanity_str = profanity_match.group(1)
        try:
            profanity_str_clean = profanity_str.strip()
            if profanity_str_clean.endswith(','):
                 profanity_str_clean = profanity_str_clean[:-1] + ']'
            profanity_list = json.loads(profanity_str_clean)
            result['profanity'] = [
                item for item in profanity_list
                if isinstance(item, dict) and 'speaker' in item and 'text' in item and 'index' in item
            ]
        except json.JSONDecodeError:
            print(f"Warning: Failed to parse profanity list: {profanity_str}")
            result['profanity'] = []
        except Exception as e:
             print(f"Warning: Error processing profanity list: {e}")
             result['profanity'] = []

    # PII disclosure flag
    pii_match = re.search(r"pii disclosure before verification:\s*(true|false)", text_lower)
    if pii_match:
        result['pii_disclosure_without_verification'] = pii_match.group(1) == 'true'
    else:
        result['pii_disclosure_without_verification'] = False

    # Rationale
    rationale_match = re.search(r"rationale:\s*(.*?)(?=verification indices:|disclosure indices:|profanity instances:|$)", text_lower, re.DOTALL)
    if rationale_match:
        result['rationale'] = rationale_match.group(1).strip()
    elif 'PII disclosure flag not found' not in result['rationale']:
         result['rationale'] = 'Rationale section not found.'

    # Verification indices
    verification_indices_match = re.search(r"verification indices:\s*(\[.*?\])", text_lower)
    if verification_indices_match:
        indices_str = verification_indices_match.group(1)
        try:
            indices_list = json.loads(indices_str)
            result['checked_verification_indices'] = [int(i) for i in indices_list if isinstance(i, (int, float))]
        except json.JSONDecodeError:
            print(f"Warning: Failed to parse verification indices list: {indices_str}")
            result['checked_verification_indices'] = []
        except Exception as e:
             print(f"Warning: Error processing verification indices list: {e}")
             result['checked_verification_indices'] = []

    # Disclosure indices
    disclosure_indices_match = re.search(r"disclosure indices:\s*(\[.*?\])", text_lower)
    if disclosure_indices_match:
        indices_str = disclosure_indices_match.group(1)
        try:
            indices_list = json.loads(indices_str)
            result['disclosure_indices'] = [int(i) for i in indices_list if isinstance(i, (int, float))]
        except json.JSONDecodeError:
            print(f"Warning: Failed to parse disclosure indices list: {indices_str}")
            result['disclosure_indices'] = []
        except Exception as e:
            print(f"Warning: Error processing disclosure indices list: {e}")
            result['disclosure_indices'] = []

    # Ensure keys
    required_keys = ['profanity', 'pii_disclosure_without_verification', 'rationale', 'checked_verification_indices', 'disclosure_indices']
    for key in required_keys:
        if key not in result:
             result[key] = [] if 'indices' in key or 'profanity' in key else (False if 'pii_disclosure' in key else 'Missing key in Gemini output.')

    return result

# --- Gemini Analysis Function ---
def analyze_transcript_gemini_with_profanity(utts: List[Dict[str, str]]) -> Dict[str, Any]:
    # Build the user message
    user_message_lines = []
    for i, utt in enumerate(utts):
        speaker = utt.get('speaker', 'UNKNOWN').strip().upper()
        text = utt.get('text', '').strip()
        user_message_lines.append(f"[{i}] {speaker}: {text}")
    user_message = "Transcript:\n" + "\n".join(user_message_lines) + "\n\nAnalyze and provide the output in the specified format."

    # Create model
    try:
        model = genai.GenerativeModel('gemini-1.5-flash-latest', system_instruction=GEMINI_PII_SYSTEM_PROMPT_WITH_PROFANITY)
    except Exception as e:
        print(f"Error creating Gemini model instance: {e}")
        return {
            'profanity': [],
            'pii_disclosure_without_verification': False,
            'rationale': f'Gemini model instantiation failed: {e}',
            'checked_verification_indices': [],
            'disclosure_indices': []
        }

    # Generate
    try:
        response = model.generate_content(user_message)
        gemini_output_text = response.text
    except Exception as e:
        print(f"Error generating content from Gemini model: {e}")
        return {
            'profanity': [],
            'pii_disclosure_without_verification': False,
            'rationale': f'Gemini content generation failed: {e}',
            'checked_verification_indices': [],
            'disclosure_indices': []
        }

    return analyze_transcript_gemini_with_profanity_raw_text(gemini_output_text)

In [None]:

# 4) MAIN PROCESSING

processed_count = 0
failed_files = []
all_analysis_results = []  # Store results for JSONL and CSV

files_to_process = files  # ALL files
print(f"Processing all {len(files_to_process)} files.")

# Clear the output files if they exist
if os.path.exists(OUT_JSONL):
    os.remove(OUT_JSONL)

# Processing loop - Using Gemini for both PII/Disclosure and Profanity
with open(OUT_JSONL, "w", encoding="utf-8") as f_out:
    for file_path in tqdm(files_to_process, desc="Processing files with Gemini"):
        try:
            # Load data from JSON or YAML
            with open(file_path, "r", encoding="utf-8") as f_in:
                if file_path.lower().endswith(".json"):
                    data = json.load(f_in)
                else:  # YAML/YML
                    data = yaml.safe_load(f_in)

            # Ensure data is list of dicts with 'speaker' and 'text'
            if not isinstance(data, list) or not all(isinstance(item, dict) and 'speaker' in item and 'text' in item for item in data):
                print(f"Skipping file {file_path}: Data is not in expected list-of-dicts format.")
                failed_files.append((file_path, "Incorrect data format"))
                continue

            # Analyze with Gemini
            gemini_analysis = analyze_transcript_gemini_with_profanity(data)

            combined = {
                'original_file': os.path.basename(file_path),
                'profanity': gemini_analysis.get('profanity', []),
                'pii_disclosure_without_verification': gemini_analysis.get('pii_disclosure_without_verification', False),
                'rationale': gemini_analysis.get('rationale', 'Gemini analysis failed or incomplete'),
                'checked_verification_indices': gemini_analysis.get('checked_verification_indices', []),
                'disclosure_indices': gemini_analysis.get('disclosure_indices', [])
            }

            all_analysis_results.append(combined)
            f_out.write(ujson.dumps(combined, ensure_ascii=False) + '\n')
            processed_count += 1

        except Exception as e:
            print(f"Error processing file {file_path}: {e}")
            failed_files.append((file_path, str(e)))
            all_analysis_results.append({
                'original_file': os.path.basename(file_path),
                'profanity': [],
                'pii_disclosure_without_verification': False,
                'rationale': f'Processing failed: {e}',
                'checked_verification_indices': [],
                'disclosure_indices': []
            })

print(f"\nFinished processing. Successfully processed {processed_count} files.")
if failed_files:
    print(f"Failed to process {len(failed_files)} files:")
    for fname, error in failed_files:
        print(f"- {fname}: {error}")

print(f"Per-file JSONL saved to {OUT_JSONL}")

In [None]:

# 5) SUMMARY 

import os
import pandas as pd

simple_rows = []

for item in all_analysis_results:
    file_name = item.get("original_file", "N_A")
    call_id   = os.path.splitext(os.path.basename(file_name))[0]

    prof_list = item.get("profanity", []) or []
    # Normalize speakers to uppercase; treat CALLER as CUSTOMER
    agent_hits = [p for p in prof_list if str(p.get("speaker","")).strip().upper() == "AGENT"]
    cust_hits  = [p for p in prof_list if str(p.get("speaker","")).strip().upper() in {"CUSTOMER","CALLER"}]

    profane_words_agent    = bool(agent_hits)
    profane_words_customer = bool(cust_hits)

    agent_profane_words_count    = len(agent_hits)
    customer_profane_words_count = len(cust_hits)

    disclosed_before_verify = bool(item.get("pii_disclosure_without_verification", False))

    simple_rows.append({
        "call_id": call_id,
        "profane_words_agent": profane_words_agent,
        "profane_words_customer": profane_words_customer,
        "agent_profane_words_count": agent_profane_words_count,
        "customer_profane_words_count": customer_profane_words_count,
        "disclosed_before_verify": disclosed_before_verify,
    })

simple_df = pd.DataFrame(simple_rows).sort_values("call_id").reset_index(drop=True)

# Save the simplified per-call summary
simple_df.to_csv(OUT_SUMMARY_CSV, index=False, encoding="utf-8")
print(f"Simple per-call summary saved to: {OUT_SUMMARY_CSV}")
display(simple_df.head())


In [None]:
# === Post-run counts — counts for the simple per-call summary CSV ===


import os
import numpy as np
import pandas as pd

assert os.path.exists(OUT_SUMMARY_CSV), f"CSV not found: {OUT_SUMMARY_CSV}"
df = pd.read_csv(OUT_SUMMARY_CSV)

def _as_bool_series(s: pd.Series) -> pd.Series:
    """Coerce mixed True/False/string/None into boolean with NaN for unknown."""
    if s is None:
        return pd.Series([np.nan] * len(df))
    def _c(v):
        if isinstance(v, bool): return v
        if pd.isna(v): return np.nan
        sv = str(v).strip().lower()
        if sv in ("true","1","yes"):  return True
        if sv in ("false","0","no"):  return False
        return np.nan
    return s.map(_c)

# Booleans
prof_agent = _as_bool_series(df["profane_words_agent"]) \
             if "profane_words_agent" in df.columns else pd.Series([np.nan]*len(df))
prof_cust  = _as_bool_series(df["profane_words_customer"]) \
             if "profane_words_customer" in df.columns else pd.Series([np.nan]*len(df))
disclose   = _as_bool_series(df["disclosed_before_verify"]) \
             if "disclosed_before_verify" in df.columns else pd.Series([np.nan]*len(df))

# Numeric counts
agent_cnt = pd.to_numeric(df["agent_profane_words_count"], errors="coerce") \
             if "agent_profane_words_count" in df.columns else pd.Series([np.nan]*len(df))
cust_cnt  = pd.to_numeric(df["customer_profane_words_count"], errors="coerce") \
             if "customer_profane_words_count" in df.columns else pd.Series([np.nan]*len(df))

# High-level aggregates
AGG_COUNTS = {
    "calls_total": int(len(df)),

    "calls_profane_agent_true": int((prof_agent == True).sum()),
    "calls_profane_agent_false": int((prof_agent == False).sum()),
    "calls_profane_agent_missing": int(prof_agent.isna().sum()),

    "calls_profane_customer_true": int((prof_cust == True).sum()),
    "calls_profane_customer_false": int((prof_cust == False).sum()),
    "calls_profane_customer_missing": int(prof_cust.isna().sum()),

    "calls_disclosed_before_verify_true": int((disclose == True).sum()),
    "calls_disclosed_before_verify_false": int((disclose == False).sum()),
    "calls_disclosed_before_verify_missing": int(disclose.isna().sum()),
}

# Per-column value distributions (exclude identifier)
COUNTS_BY_COLUMN = {
    col: df[col].value_counts(dropna=False).to_dict()
    for col in df.columns
    if col != "call_id"
}

# Single merged object if you want to export/inspect
AGG_COUNTS_ALL = {
    "summary": AGG_COUNTS,
    "by_column": COUNTS_BY_COLUMN,
}

# Peek
print("AGG_COUNTS =", AGG_COUNTS)


In [None]:
# === Create a NEW summary CSV that includes utterance numbers per role (does NOT overwrite the original) ===
# Prereqs:
#   - OUT_SUMMARY_CSV already exists (your per-call summary)
#   - Preferably OUT_JSONL exists (the detailed per-file records saved by the main run)
#      Each JSONL line should have keys: file_name, profanity_list, disclosed_before_verify, verify_indices, disclosure_indices, note
#   - If OUT_JSONL is missing, this will try a fallback column 'profanity_list_json' in the summary CSV (if you added it earlier)

import os, json, ujson, pandas as pd

assert os.path.exists(OUT_SUMMARY_CSV), f"CSV not found: {OUT_SUMMARY_CSV}"
df_sum = pd.read_csv(OUT_SUMMARY_CSV)

NEW_SUMMARY_CSV = os.path.join(OUT_DIR, "gemini_summary.csv")

def _call_id_from_file_name(file_name: str) -> str:
    base = os.path.basename(str(file_name))
    return os.path.splitext(base)[0]

def _extract_nums_from_prof_list(prof_list):
    """Return (agent_nums, customer_nums) as sorted lists of 1-based utterance numbers."""
    agent_nums, customer_nums = [], []
    for p in (prof_list or []):
        try:
            spk = str(p.get("speaker", "")).strip().upper()
            idx0 = p.get("index", None)
            idx1 = (int(idx0) + 1) if isinstance(idx0, (int, float)) else None
            if idx1 is None:
                continue
            if spk == "AGENT":
                agent_nums.append(idx1)
            elif spk in {"CUSTOMER", "CALLER"}:
                customer_nums.append(idx1)
        except Exception:
            continue
    return sorted(agent_nums), sorted(customer_nums)

# --- Build lookup of call_id -> (agent_nums, customer_nums) ---
call_to_nums = {}
source_used = None

if os.path.exists(OUT_JSONL):
    source_used = "jsonl"
    with open(OUT_JSONL, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                rec = ujson.loads(line)
                file_name = rec.get("file_name", "N_A")
                call_id = _call_id_from_file_name(file_name)
                prof_list = rec.get("profanity_list", []) or []
                agent_nums, customer_nums = _extract_nums_from_prof_list(prof_list)
                call_to_nums[call_id] = (agent_nums, customer_nums)
            except Exception:
                continue
elif "profanity_list_json" in df_sum.columns:
    source_used = "csv_jsoncol"
    for _, row in df_sum.iterrows():
        try:
            prof_list = row["profanity_list_json"]
            if isinstance(prof_list, str):
                prof_list = json.loads(prof_list)
        except Exception:
            prof_list = []
        agent_nums, customer_nums = _extract_nums_from_prof_list(prof_list)
        call_to_nums[str(row.get("call_id", "N_A"))] = (agent_nums, customer_nums)
else:
    source_used = "none"

# --- Create augmented copy of the original summary without overwriting it ---
df_aug = df_sum.copy()

if source_used == "none":
    # No detailed source available; still create the new CSV with empty columns
    df_aug["agent_profane_utterance_nos"] = ""
    df_aug["customer_profane_utterance_nos"] = ""
    print(
        "Could not populate utterance numbers:\n"
        "- OUT_JSONL not found, and\n"
        "- 'profanity_list_json' column not present in the summary CSV.\n"
        "Saving a new CSV with empty utterance-number columns."
    )
else:
    agent_col = []
    cust_col = []
    for _, row in df_aug.iterrows():
        call_id = str(row.get("call_id", "N_A"))
        a_nums, c_nums = call_to_nums.get(call_id, ([], []))
        agent_col.append(", ".join(map(str, a_nums)) if a_nums else "")
        cust_col.append(", ".join(map(str, c_nums)) if c_nums else "")
    df_aug["agent_profane_utterance_nos"] = agent_col
    df_aug["customer_profane_utterance_nos"] = cust_col
    print(f"Utterance numbers populated from: {source_used}")

# Save to a NEW file
df_aug.to_csv(NEW_SUMMARY_CSV, index=False, encoding="utf-8")
print(f"New summary (with utterance numbers) saved to: {NEW_SUMMARY_CSV}")
display(df_aug.head())
