# 12-Week Personalized Learning Lab: SOC 2 Document Analysis Automation

**Generated for:** TPRM Lead, Midsize B2B SaaS Company

**Goal:** Reduce SOC 2 review time from 3+ hours to 30-45 minutes through programmatic parsing

**Time Commitment:** 3 hours per week

**Your Context:**
- 500+ vendors requiring annual reviews
- Current manual process using Google Workspace
- Intermediate Python skills
- Board requesting metrics dashboard

---

## Phase 1: Foundation & Baseline (Weeks 1-2)

**Focus:** Understand document structure without coding

### Week 1: Manual Document Mapping

**Objective:** Create a mental model of SOC 2 report structure

**Activities:**
1. Take 3 recent SOC 2 Type 2 reports from different auditors (Big 4 if possible)
2. For each report, create a "map" document noting:
   - Where does the auditor opinion appear? (page number, heading text)
   - Where are report dates listed?
   - How are TSC categories labeled?
   - Where do exceptions appear?
   - Where are CUECs mentioned?
   - Where are subservice organizations listed?

**Deliverable:** Google Doc with 3-column comparison table

**Time:** 2-3 hours

**Why This Matters:** You need to see patterns before you can code them. Different auditors format reports differently - this week you learn what's consistent vs. variable.

### Week 2: Current Process Baseline Measurement

**Objective:** Quantify your current manual process to measure future improvement

**Activities:**
1. Time yourself doing ONE complete SOC 2 review using your current method
2. Break down time spent:
   - Finding each section
   - Reading and interpreting content
   - Copying data to your tracking system
   - Cross-referencing with previous reviews
3. Document pain points (e.g., "Spent 20 minutes searching for subservice org list")

**Deliverable:** Baseline metrics document showing:
- Total time: X minutes
- Time breakdown by activity
- Top 3 time wasters

**Time:** 3+ hours (one full review + documentation)

**Why This Matters:** You'll use these numbers to calculate ROI when presenting to your board. Concrete before/after metrics are powerful.

---

## Phase 2: Build Core Extraction (Weeks 3-5)

**Focus:** Get Python working on actual PDFs

### Week 3: PDF Text Extraction Setup

**Objective:** Get Python to read SOC 2 PDFs and output raw text

**Setup:**

In [None]:
# Install required library
# Run this in your terminal: pip install PyPDF2

import PyPDF2

def extract_text_from_pdf(pdf_path):
    """
    Extracts all text from a PDF file.
    
    Args:
        pdf_path (str): Path to the PDF file
    
    Returns:
        str: All text content from the PDF
    """
    with open(pdf_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)
        num_pages = len(pdf_reader.pages)
        
        full_text = ""
        for page_num in range(num_pages):
            page = pdf_reader.pages[page_num]
            page_text = page.extract_text()
            full_text += f"\n--- PAGE {page_num + 1} ---\n"
            full_text += page_text
        
        return full_text

# Test it out
test_path = "/path/to/your/soc2_report.pdf"  # UPDATE THIS PATH
text = extract_text_from_pdf(test_path)
print(f"Extracted {len(text)} characters from PDF")
print("\nFirst 500 characters:")
print(text[:500])

**Activities:**
1. Install PyPDF2 library
2. Run the code above on one of your SOC 2 reports
3. Save the output to a text file and review it
4. Note any formatting issues (tables might look weird, headers might be jumbled)

**Deliverable:** Working Python script that extracts text from SOC 2 PDFs

**Time:** 2-3 hours

**Troubleshooting:**
- If text looks garbled: Try `pdfplumber` library instead
- If you get permission errors: Make sure PDF isn't password protected

**Why This Matters:** You can't analyze what you can't read. This is your foundation.

### Week 4: Section Finder with Pattern Matching

**Objective:** Teach Python to find specific sections using regex patterns

**Concept:** Regular expressions (regex) are patterns that match text. Think of them as smart search.

In [None]:
import re

def find_opinion_section(full_text):
    """
    Finds the auditor's opinion section.
    Common patterns: "Independent Service Auditor's Report", "Opinion"
    """
    # Pattern matches variations of opinion section headers
    pattern = r"(Independent Service Auditor[''']s Report|INDEPENDENT SERVICE AUDITOR[''']S REPORT)"
    
    match = re.search(pattern, full_text, re.IGNORECASE)
    if match:
        start_pos = match.start()
        # Extract 2000 characters after the match (typical opinion length)
        opinion_text = full_text[start_pos:start_pos+2000]
        return opinion_text
    else:
        return "Opinion section not found"

def find_report_period(full_text):
    """
    Finds report period dates.
    Pattern: "January 1, 2023 to December 31, 2023" or similar
    """
    # Matches date ranges with various formats
    pattern = r"(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},\s+\d{4}\s+to\s+(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},\s+\d{4}"
    
    match = re.search(pattern, full_text)
    if match:
        return match.group(0)
    else:
        return "Report period not found"

# Test on your extracted text
opinion = find_opinion_section(text)
period = find_report_period(text)

print("OPINION SECTION:")
print(opinion[:300])  # First 300 chars
print("\nREPORT PERIOD:")
print(period)

**Activities:**
1. Test these functions on 2-3 different SOC 2 reports
2. Note when patterns DON'T match (different auditor formatting)
3. Adjust regex patterns to handle variations
4. Create finder functions for:
   - TSC categories mentioned
   - Exception sections
   - CUEC sections

**Deliverable:** Library of finder functions that work across multiple auditor formats

**Time:** 3-4 hours

**Regex Learning Resources:**
- regex101.com (test patterns visually)
- "Match 'Report' followed by any word then a date"
- `r"Report\s+\w+\s+\d{4}"` means: "Report" + space + word + space + 4 digits

**Why This Matters:** Generic extraction isn't useful. You need to pinpoint specific data your stakeholders care about.

### Week 5: Structured Data Extraction

**Objective:** Extract specific data points and structure them for spreadsheet export

**Full Script:** `soc2_to_spreadsheet.py`

In [None]:
import PyPDF2
import re
import csv
from datetime import datetime

def extract_vendor_name(full_text):
    """
    Extracts vendor/service organization name.
    Usually appears early in the report.
    """
    # Look for common patterns in first 2000 characters
    header_text = full_text[:2000]
    
    # Pattern: "Report on [Company Name]'s"
    pattern1 = r"Report on ([A-Z][\w\s,.']+?)['']s"
    match1 = re.search(pattern1, header_text)
    if match1:
        return match1.group(1).strip()
    
    # Pattern: Company name in title case on first page
    pattern2 = r"([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s+(?:System|Controls)"
    match2 = re.search(pattern2, header_text)
    if match2:
        return match2.group(1).strip()
    
    return "Unknown Vendor"

def extract_report_period(full_text):
    """
    Extracts start and end dates of report period.
    Returns dict with start_date and end_date.
    """
    # Match: "Month DD, YYYY to Month DD, YYYY"
    pattern = r"(January|February|March|April|May|June|July|August|September|October|November|December)\s+(\d{1,2}),\s+(\d{4})\s+to\s+(January|February|March|April|May|June|July|August|September|October|November|December)\s+(\d{1,2}),\s+(\d{4})"
    
    match = re.search(pattern, full_text[:5000])  # Check first 5000 chars
    if match:
        start_month, start_day, start_year = match.group(1), match.group(2), match.group(3)
        end_month, end_day, end_year = match.group(4), match.group(5), match.group(6)
        
        return {
            "start_date": f"{start_month} {start_day}, {start_year}",
            "end_date": f"{end_month} {end_day}, {end_year}",
            "period_length_days": calculate_period_length(start_month, start_day, start_year, end_month, end_day, end_year)
        }
    
    return {"start_date": "Not Found", "end_date": "Not Found", "period_length_days": 0}

def extract_opinion_type(full_text):
    """
    Determines if opinion is Unqualified, Qualified, Adverse, or Disclaimer.
    """
    # Find opinion section
    opinion_pattern = r"(Independent Service Auditor[''']s Report.*?)(Description of|Management[''']s Responsibilities)"
    opinion_match = re.search(opinion_pattern, full_text, re.DOTALL | re.IGNORECASE)
    
    if not opinion_match:
        return "Opinion Not Found"
    
    opinion_section = opinion_match.group(1)
    
    # Check for qualification language
    if re.search(r"qualified opinion|except for|with the exception", opinion_section, re.IGNORECASE):
        return "Qualified"
    elif re.search(r"adverse opinion|do not present fairly", opinion_section, re.IGNORECASE):
        return "Adverse"
    elif re.search(r"disclaimer|unable to obtain|scope limitation", opinion_section, re.IGNORECASE):
        return "Disclaimer"
    elif re.search(r"in our opinion.*fairly|presents fairly|appropriately designed|operating effectively", opinion_section, re.IGNORECASE):
        return "Unqualified (Clean)"
    
    return "Unable to Determine"

def extract_tsc_categories(full_text):
    """
    Identifies which Trust Service Criteria categories are covered.
    Returns list of categories found.
    """
    categories = []
    
    category_patterns = {
        "Security": r"\bSecurity\b.*?(?:criteria|principle|trust service)",
        "Availability": r"\bAvailability\b.*?(?:criteria|principle|trust service)",
        "Processing Integrity": r"\b(?:Processing Integrity|Process Integrity)\b",
        "Confidentiality": r"\bConfidentiality\b.*?(?:criteria|principle|trust service)",
        "Privacy": r"\bPrivacy\b.*?(?:criteria|principle|trust service)"
    }
    
    for category, pattern in category_patterns.items():
        if re.search(pattern, full_text[:10000], re.IGNORECASE):
            categories.append(category)
    
    return categories if categories else ["No TSC Categories Identified"]

def extract_exception_count(full_text):
    """
    Counts testing exceptions mentioned in the report.
    Returns count and list of exception descriptions if found.
    """
    # Look for exception section
    exception_section_pattern = r"(Tests of Controls.*?Results of Tests|Testing Exceptions?|Deviations Noted)"
    exception_match = re.search(exception_section_pattern, full_text, re.DOTALL | re.IGNORECASE)
    
    if not exception_match:
        # Check for "no exceptions noted" language
        if re.search(r"no exceptions? (?:were|was) noted|without exception", full_text, re.IGNORECASE):
            return {"count": 0, "exceptions": ["No exceptions noted"]}
        return {"count": 0, "exceptions": ["Exception section not found"]}
    
    exception_text = exception_match.group(0)
    
    # Count bullet points or numbered exceptions
    exception_items = re.findall(r"(?:^|\n)\s*[•\-\*\d+\.)]", exception_text)
    count = len(exception_items)
    
    # Extract exception descriptions (first sentence of each)
    exception_descriptions = re.findall(r"[•\-\*\d+\.)\s*([^.\n]+)", exception_text)[:5]  # Max 5
    
    return {"count": count, "exceptions": exception_descriptions}

def extract_cuecs(full_text):
    """
    Identifies Complementary User Entity Controls.
    Returns count and list of CUECs if found.
    """
    cuec_pattern = r"(Complementary (?:User Entity )?Controls?|CUEC).*?(?=\n\n|Description of|Management)"
    cuec_match = re.search(cuec_pattern, full_text, re.DOTALL | re.IGNORECASE)
    
    if not cuec_match:
        return {"count": 0, "cuecs": ["No CUECs found"]}
    
    cuec_section = cuec_match.group(0)
    
    # Count CUEC items
    cuec_items = re.findall(r"(?:^|\n)\s*(?:CUEC)?\s*[•\-\*\d+\.)]", cuec_section)
    count = len(cuec_items)
    
    # Extract CUEC descriptions
    cuec_descriptions = re.findall(r"[•\-\*\d+\.)\s*([^.\n]+)", cuec_section)[:5]
    
    return {"count": count, "cuecs": cuec_descriptions}

def extract_subservice_orgs(full_text):
    """
    Identifies subservice organizations mentioned.
    Returns list of org names.
    """
    subservice_pattern = r"(Subservice Organi[zs]ations?|Sub-?service Providers?).*?(?=\n\n|Description of|Management)"
    subservice_match = re.search(subservice_pattern, full_text, re.DOTALL | re.IGNORECASE)
    
    if not subservice_match:
        if re.search(r"no subservice organi[zs]ations?", full_text, re.IGNORECASE):
            return ["No subservice organizations"]
        return ["Subservice org section not found"]
    
    subservice_section = subservice_match.group(0)
    
    # Extract org names (typically capitalized company names)
    org_names = re.findall(r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,3})\b", subservice_section)
    
    # Remove common words that aren't company names
    common_words = ["Subservice", "Organization", "Provider", "Services", "Controls", "Management"]
    org_names = [org for org in org_names if org not in common_words]
    
    return org_names[:10] if org_names else ["Unable to extract org names"]

def calculate_period_length(start_month, start_day, start_year, end_month, end_day, end_year):
    """
    Calculates length of report period in days.
    """
    try:
        start = datetime.strptime(f"{start_month} {start_day}, {start_year}", "%B %d, %Y")
        end = datetime.strptime(f"{end_month} {end_day}, {end_year}", "%B %d, %Y")
        return (end - start).days
    except:
        return 0

def process_soc2_report(pdf_path):
    """
    Main function that extracts all data points from a SOC 2 report.
    """
    # Extract text
    with open(pdf_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)
        full_text = ""
        for page in pdf_reader.pages:
            full_text += page.extract_text()
    
    # Extract all data points
    vendor_name = extract_vendor_name(full_text)
    report_period = extract_report_period(full_text)
    opinion_type = extract_opinion_type(full_text)
    tsc_categories = extract_tsc_categories(full_text)
    exception_data = extract_exception_count(full_text)
    cuec_data = extract_cuecs(full_text)
    subservice_orgs = extract_subservice_orgs(full_text)
    
    # Structure data
    return {
        "Vendor Name": vendor_name,
        "Report Start Date": report_period["start_date"],
        "Report End Date": report_period["end_date"],
        "Period Length (Days)": report_period["period_length_days"],
        "Opinion Type": opinion_type,
        "TSC Categories": ", ".join(tsc_categories),
        "Exception Count": exception_data["count"],
        "Exceptions": "; ".join(exception_data["exceptions"][:3]),  # First 3
        "CUEC Count": cuec_data["count"],
        "CUECs": "; ".join(cuec_data["cuecs"][:3]),  # First 3
        "Subservice Organizations": ", ".join(subservice_orgs[:5])  # First 5
    }

def write_to_csv(data, output_path):
    """
    Writes extracted data to CSV file.
    """
    with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = list(data.keys())
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        writer.writeheader()
        writer.writerow(data)
    
    print(f"Data written to {output_path}")

# USAGE EXAMPLE
if __name__ == "__main__":
    pdf_path = "/path/to/soc2_report.pdf"  # UPDATE THIS
    output_csv = "/path/to/output.csv"  # UPDATE THIS
    
    print("Processing SOC 2 report...")
    extracted_data = process_soc2_report(pdf_path)
    
    print("\nExtracted Data:")
    for key, value in extracted_data.items():
        print(f"{key}: {value}")
    
    write_to_csv(extracted_data, output_csv)
    print("\nDone!")

**Activities:**
1. Save the code above as `soc2_to_spreadsheet.py`
2. Update the file paths in the usage example
3. Run on 3 different SOC 2 reports
4. Review the CSV output - what's accurate? What needs refinement?
5. Adjust regex patterns based on your specific vendors' report formats

**Deliverable:** Python script that outputs structured CSV data

**Time:** 3-4 hours

**Testing Tips:**
- Start with one function at a time
- Print intermediate results to see what's being matched
- Compare script output to manual review for accuracy

**Why This Matters:** Unstructured text becomes actionable data you can analyze at scale.

---

## Phase 3: Scale & Integrate (Weeks 6-8)

**Focus:** Process multiple reports and connect to your workflow

### Week 6: Batch Processing Multiple Reports

**Objective:** Process an entire folder of SOC 2 reports in one run

**Full Script:** `batch_soc2_processor.py`

In [None]:
import os
import csv
from soc2_to_spreadsheet import process_soc2_report
import logging
from datetime import datetime

# Setup logging
logging.basicConfig(
    filename='soc2_batch_processing.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def find_pdf_files(directory):
    """
    Finds all PDF files in a directory (including subdirectories).
    """
    pdf_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.lower().endswith('.pdf'):
                pdf_files.append(os.path.join(root, file))
    return pdf_files

def process_batch(input_directory, output_csv):
    """
    Processes all SOC 2 reports in a directory and outputs to single CSV.
    
    Args:
        input_directory (str): Path to folder containing SOC 2 PDFs
        output_csv (str): Path for output CSV file
    """
    print(f"Scanning {input_directory} for PDF files...")
    pdf_files = find_pdf_files(input_directory)
    print(f"Found {len(pdf_files)} PDF files\n")
    
    if not pdf_files:
        print("No PDF files found. Exiting.")
        return
    
    all_data = []
    errors = []
    
    for i, pdf_path in enumerate(pdf_files, 1):
        filename = os.path.basename(pdf_path)
        print(f"[{i}/{len(pdf_files)}] Processing: {filename}")
        logging.info(f"Processing: {pdf_path}")
        
        try:
            # Process the report
            data = process_soc2_report(pdf_path)
            
            # Add metadata
            data["Source File"] = filename
            data["File Path"] = pdf_path
            data["Processing Date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            
            all_data.append(data)
            print(f"  ✓ Success: {data['Vendor Name']}\n")
            logging.info(f"Successfully processed: {filename}")
            
        except Exception as e:
            error_msg = f"Error processing {filename}: {str(e)}"
            print(f"  ✗ {error_msg}\n")
            logging.error(error_msg)
            errors.append({"file": filename, "error": str(e)})
            
            # Add error row to maintain record
            error_data = {
                "Source File": filename,
                "File Path": pdf_path,
                "Processing Date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "Vendor Name": "ERROR",
                "Opinion Type": f"Processing failed: {str(e)}"
            }
            all_data.append(error_data)
    
    # Write all data to CSV
    if all_data:
        write_batch_to_csv(all_data, output_csv)
        print(f"\n{'='*60}")
        print(f"BATCH PROCESSING COMPLETE")
        print(f"{'='*60}")
        print(f"Total files: {len(pdf_files)}")
        print(f"Successful: {len(pdf_files) - len(errors)}")
        print(f"Errors: {len(errors)}")
        print(f"Output saved to: {output_csv}")
        
        if errors:
            print(f"\nErrors logged to: soc2_batch_processing.log")
            print("\nFailed files:")
            for error in errors:
                print(f"  - {error['file']}: {error['error']}")

def write_batch_to_csv(data_list, output_path):
    """
    Writes list of data dictionaries to CSV.
    Handles missing fields gracefully.
    """
    if not data_list:
        return
    
    # Get all unique field names from all records
    all_fields = set()
    for data in data_list:
        all_fields.update(data.keys())
    
    fieldnames = sorted(all_fields)
    
    with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        
        for data in data_list:
            # Fill missing fields with empty string
            complete_data = {field: data.get(field, "") for field in fieldnames}
            writer.writerow(complete_data)

# USAGE EXAMPLE
if __name__ == "__main__":
    # UPDATE THESE PATHS
    input_dir = "/path/to/soc2_reports_folder"  # Folder with your SOC 2 PDFs
    output_csv = "/path/to/batch_output.csv"  # Where to save results
    
    print("SOC 2 Batch Processor")
    print("=" * 60)
    print(f"Input directory: {input_dir}")
    print(f"Output CSV: {output_csv}")
    print("=" * 60 + "\n")
    
    process_batch(input_dir, output_csv)

**Activities:**
1. Create a test folder with 5-10 SOC 2 reports
2. Run the batch processor
3. Review the output CSV and log file
4. Identify common errors (encoding issues, malformed PDFs, etc.)
5. Add error handling for edge cases you discover

**Deliverable:** Batch processing script that handles 50+ reports reliably

**Time:** 3-4 hours

**Testing Strategy:**
- Run on small batches first (5 reports)
- Then medium (20 reports)
- Then your full vendor library

**Common Issues & Solutions:**
- **Encoding errors**: Use `encoding='utf-8', errors='ignore'` in file operations
- **Memory issues**: Process files one at a time (already implemented)
- **Timeout**: Add time.sleep(0.5) between files if needed

**Why This Matters:** You can now process your entire vendor portfolio in one run. This is where 3 hours becomes 30 minutes.

### Week 7: Google Sheets Integration

**Objective:** Automatically upload processed data to Google Sheets for stakeholder access

**Setup Requirements:**
1. Enable Google Sheets API in Google Cloud Console
2. Create service account and download credentials JSON
3. Install libraries: `pip install gspread oauth2client`

**Full Script:** `upload_to_gsheets.py`

In [None]:
import gspread
from oauth2client.service_account import ServiceAccountCredentials
import csv
from datetime import datetime

def setup_gsheets_connection(credentials_json_path):
    """
    Sets up connection to Google Sheets API.
    
    Args:
        credentials_json_path (str): Path to your service account credentials JSON
    
    Returns:
        gspread.Client: Authorized client object
    """
    scope = [
        'https://spreadsheets.google.com/feeds',
        'https://www.googleapis.com/auth/drive'
    ]
    
    creds = ServiceAccountCredentials.from_json_keyfile_name(credentials_json_path, scope)
    client = gspread.authorize(creds)
    
    return client

def upload_csv_to_sheet(csv_path, sheet_url, credentials_json_path, worksheet_name="SOC2 Data"):
    """
    Uploads CSV data to Google Sheets.
    Creates new worksheet if doesn't exist.
    
    Args:
        csv_path (str): Path to CSV file
        sheet_url (str): URL of Google Sheet
        credentials_json_path (str): Path to credentials JSON
        worksheet_name (str): Name for the worksheet
    """
    print("Connecting to Google Sheets...")
    client = setup_gsheets_connection(credentials_json_path)
    
    # Open the spreadsheet
    print(f"Opening spreadsheet...")
    spreadsheet = client.open_by_url(sheet_url)
    
    # Read CSV data
    print(f"Reading CSV data from {csv_path}...")
    with open(csv_path, 'r', encoding='utf-8') as f:
        csv_reader = csv.reader(f)
        data = list(csv_reader)
    
    print(f"Found {len(data)} rows (including header)")
    
    # Check if worksheet exists
    try:
        worksheet = spreadsheet.worksheet(worksheet_name)
        print(f"Worksheet '{worksheet_name}' found. Clearing existing data...")
        worksheet.clear()
    except gspread.WorksheetNotFound:
        print(f"Creating new worksheet '{worksheet_name}'...")
        worksheet = spreadsheet.add_worksheet(title=worksheet_name, rows=len(data)+10, cols=len(data[0])+5)
    
    # Upload data
    print("Uploading data...")
    worksheet.update('A1', data)
    
    # Format header row
    print("Formatting...")
    worksheet.format('A1:Z1', {
        "backgroundColor": {"red": 0.2, "green": 0.4, "blue": 0.7},
        "textFormat": {"foregroundColor": {"red": 1.0, "green": 1.0, "blue": 1.0}, "bold": True}
    })
    
    # Freeze header row
    worksheet.freeze(rows=1)
    
    print(f"\n✓ Upload complete!")
    print(f"View at: {spreadsheet.url}")
    
    return spreadsheet.url

def create_dashboard_worksheet(sheet_url, credentials_json_path, data_worksheet="SOC2 Data"):
    """
    Creates a dashboard worksheet with summary formulas.
    """
    client = setup_gsheets_connection(credentials_json_path)
    spreadsheet = client.open_by_url(sheet_url)
    
    # Create dashboard worksheet
    try:
        dashboard = spreadsheet.worksheet("Dashboard")
        dashboard.clear()
    except gspread.WorksheetNotFound:
        dashboard = spreadsheet.add_worksheet(title="Dashboard", rows=30, cols=10)
    
    # Dashboard content
    dashboard_data = [
        ["SOC 2 VENDOR RISK DASHBOARD", "", "", ""],
        [f"Last Updated: {datetime.now().strftime('%Y-%m-%d %H:%M')}", "", "", ""],
        [""],
        ["KEY METRICS", "", "", ""],
        ["Total Vendors Reviewed", f"=COUNTA('{data_worksheet}'!A2:A)"],
        ["Unqualified Opinions", f"=COUNTIF('{data_worksheet}'!E:E,\"Unqualified*\")"],
        ["Qualified/Adverse Opinions", f"=COUNTIF('{data_worksheet}'!E:E,\"Qualified\")+COUNTIF('{data_worksheet}'!E:E,\"Adverse\")"],
        ["Average Exception Count", f"=AVERAGE('{data_worksheet}'!G:G)"],
        ["Vendors with 0 Exceptions", f"=COUNTIF('{data_worksheet}'!G:G,0)"],
        [""],
        ["TSC CATEGORY COVERAGE", ""],
        ["Security", f"=COUNTIF('{data_worksheet}'!F:F,\"*Security*\")"],
        ["Availability", f"=COUNTIF('{data_worksheet}'!F:F,\"*Availability*\")"],
        ["Confidentiality", f"=COUNTIF('{data_worksheet}'!F:F,\"*Confidentiality*\")"],
        ["Processing Integrity", f"=COUNTIF('{data_worksheet}'!F:F,\"*Processing Integrity*\")"],
        ["Privacy", f"=COUNTIF('{data_worksheet}'!F:F,\"*Privacy*\")"],
        [""],
        ["RISK INDICATORS", ""],
        ["Vendors with >5 Exceptions", f"=COUNTIF('{data_worksheet}'!G:G,\">5\")"],
        ["Vendors with Subservice Orgs", f"=COUNTA('{data_worksheet}'!K2:K)-COUNTIF('{data_worksheet}'!K:K,\"*No subservice*\")"],
        ["Reports > 1 Year Old", f"=COUNTIF('{data_worksheet}'!D:D,\">365\")"],
    ]
    
    dashboard.update('A1', dashboard_data)
    
    # Format dashboard
    dashboard.format('A1:D1', {
        "backgroundColor": {"red": 0.1, "green": 0.2, "blue": 0.5},
        "textFormat": {"foregroundColor": {"red": 1.0, "green": 1.0, "blue": 1.0}, "bold": True, "fontSize": 14}
    })
    
    dashboard.format('A4:A4', {"textFormat": {"bold": True}})
    dashboard.format('A11:A11', {"textFormat": {"bold": True}})
    dashboard.format('A18:A18', {"textFormat": {"bold": True}})
    
    print("✓ Dashboard created!")

# USAGE EXAMPLE
if __name__ == "__main__":
    # UPDATE THESE
    csv_file = "/path/to/batch_output.csv"
    sheet_url = "https://docs.google.com/spreadsheets/d/YOUR_SHEET_ID/edit"
    credentials_json = "/path/to/credentials.json"
    
    # Upload data
    upload_csv_to_sheet(csv_file, sheet_url, credentials_json)
    
    # Create dashboard
    create_dashboard_worksheet(sheet_url, credentials_json)

**Activities:**
1. Set up Google Cloud project and enable Sheets API
2. Create service account and download credentials
3. Share your target Google Sheet with the service account email
4. Run upload script on your batch output
5. Verify data appears correctly in Google Sheets
6. Run dashboard creation script

**Deliverable:** Automated pipeline from PDF → CSV → Google Sheets with dashboard

**Time:** 3-4 hours (includes Google Cloud setup)

**Setup Guide:**
1. Go to console.cloud.google.com
2. Create new project or select existing
3. Enable "Google Sheets API" and "Google Drive API"
4. Create credentials → Service account
5. Download JSON key file
6. Share your Google Sheet with the service account email (found in JSON)

**Why This Matters:** Your leadership can now see live vendor risk data without asking you for reports.

### Week 8: Dashboard Building & Visualization

**Objective:** Create executive-ready visualizations in Google Sheets

**Activities:**
1. **Add Conditional Formatting:**
   - Exception Count column: Green (0), Yellow (1-3), Red (4+)
   - Opinion Type: Green (Unqualified), Red (Qualified/Adverse)
   - Period Length: Yellow if >365 days (report over 1 year old)

2. **Create Charts:**
   - Pie chart: Opinion Type distribution
   - Bar chart: Exception Count by vendor (top 10)
   - Column chart: TSC Category coverage
   - Line chart: Report age distribution

3. **Add Filter Views:**
   - High Risk: Qualified opinions OR >5 exceptions
   - Needs Renewal: Reports >365 days old
   - Clean Reports: Unqualified opinion AND 0 exceptions

4. **Create Summary Tab:**
   - Use formulas from dashboard creation script
   - Add trend tracking (compare to previous quarter)
   - Include top 5 riskiest vendors table

**Deliverable:** Board-ready Google Sheets dashboard

**Time:** 2-3 hours

**Formulas You'll Use:**
```
=COUNTIF(range, criteria)  # Count items matching condition
=AVERAGE(range)  # Calculate average
=SORT(range, column, TRUE/FALSE)  # Sort data
=FILTER(range, condition)  # Filter data
```

**Presentation Tips:**
- Use your organization's color scheme
- Keep it simple - 3-5 key metrics on first page
- Include "Last Updated" timestamp
- Add brief text explaining what each metric means

**Why This Matters:** This is what you show the board. You're translating technical data into business value.

---

## Phase 4: Advanced Analysis (Weeks 9-10)

**Focus:** Deeper insights and risk scoring

### Week 9: Exception Pattern Analysis

**Objective:** Categorize and analyze types of exceptions to identify common control weaknesses

**Full Script:** `exception_analyzer.py`

In [None]:
import csv
import re
from collections import Counter

# Exception category keywords
EXCEPTION_CATEGORIES = {
    "Access Control": ["access", "authentication", "authorization", "password", "user", "login", "privilege"],
    "Change Management": ["change", "deployment", "release", "migration", "update", "patch"],
    "Monitoring": ["log", "monitoring", "alert", "detection", "review", "audit trail"],
    "Backup & Recovery": ["backup", "recovery", "restore", "disaster", "continuity"],
    "Segregation of Duties": ["segregation", "separation", "duties", "SOD", "conflict"],
    "Documentation": ["documentation", "policy", "procedure", "evidence", "record"],
    "Testing": ["testing", "test", "validation", "verification"],
    "Vendor Management": ["vendor", "third party", "subservice", "supplier", "outsourced"]
}

def categorize_exception(exception_text):
    """
    Categorizes an exception based on keyword matching.
    Returns list of matching categories.
    """
    exception_lower = exception_text.lower()
    categories = []
    
    for category, keywords in EXCEPTION_CATEGORIES.items():
        for keyword in keywords:
            if keyword in exception_lower:
                categories.append(category)
                break  # Don't double-count same category
    
    return categories if categories else ["Other/Uncategorized"]

def analyze_exceptions_from_csv(csv_path):
    """
    Analyzes exception patterns from batch processing CSV.
    """
    print("Analyzing exception patterns...\n")
    
    # Read data
    with open(csv_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        data = list(reader)
    
    all_categories = []
    vendor_categories = {}  # Track categories per vendor
    
    for row in data:
        vendor = row.get('Vendor Name', 'Unknown')
        exceptions_text = row.get('Exceptions', '')
        
        if exceptions_text and exceptions_text != "No exceptions noted":
            # Split multiple exceptions (separated by semicolons)
            individual_exceptions = exceptions_text.split(';')
            
            vendor_categories[vendor] = []
            
            for exception in individual_exceptions:
                exception = exception.strip()
                if exception:
                    categories = categorize_exception(exception)
                    all_categories.extend(categories)
                    vendor_categories[vendor].extend(categories)
    
    # Calculate statistics
    category_counts = Counter(all_categories)
    total_exceptions = len(all_categories)
    
    print("="*60)
    print("EXCEPTION CATEGORY ANALYSIS")
    print("="*60)
    print(f"Total exceptions analyzed: {total_exceptions}\n")
    
    print("Category Breakdown:")
    print("-" * 60)
    for category, count in category_counts.most_common():
        percentage = (count / total_exceptions) * 100
        print(f"{category:<30} {count:>5} ({percentage:>5.1f}%)")
    
    # Find vendors with most diverse exception types
    print("\n" + "="*60)
    print("VENDORS WITH MOST DIVERSE EXCEPTIONS")
    print("="*60)
    vendor_diversity = {vendor: len(set(cats)) for vendor, cats in vendor_categories.items()}
    sorted_vendors = sorted(vendor_diversity.items(), key=lambda x: x[1], reverse=True)[:10]
    
    for vendor, diversity_count in sorted_vendors:
        print(f"{vendor:<40} {diversity_count} different categories")
    
    return category_counts, vendor_categories

def generate_exception_report(csv_path, output_path):
    """
    Generates detailed exception analysis report.
    """
    category_counts, vendor_categories = analyze_exceptions_from_csv(csv_path)
    
    # Write to CSV
    with open(output_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(["Category", "Count", "Percentage"])
        
        total = sum(category_counts.values())
        for category, count in category_counts.most_common():
            percentage = (count / total) * 100
            writer.writerow([category, count, f"{percentage:.1f}%"])
    
    print(f"\nDetailed report saved to: {output_path}")

# USAGE
if __name__ == "__main__":
    input_csv = "/path/to/batch_output.csv"  # Your SOC 2 data CSV
    output_csv = "/path/to/exception_analysis.csv"
    
    generate_exception_report(input_csv, output_csv)

**Activities:**
1. Run exception analyzer on your batch output
2. Review category distribution - which exception types are most common?
3. Identify vendors with diverse exception patterns (higher risk)
4. Add custom categories relevant to your organization
5. Create executive summary of findings

**Deliverable:** Exception pattern analysis report

**Time:** 2-3 hours

**Analysis Questions:**
- Which exception categories appear most frequently?
- Are certain auditors more likely to flag specific issues?
- Which vendors have the most concerning exception profiles?
- What does this tell you about common industry control gaps?

**Why This Matters:** You move from "this vendor has 3 exceptions" to "this vendor has systemic access control issues across multiple areas."

### Week 10: Automated Risk Scoring

**Objective:** Create quantitative risk scores (0-100) based on SOC 2 findings

**Scoring Methodology:**
- Base Score: 100 (perfect)
- Deductions:
  - Opinion Type: Unqualified (0), Qualified (-30), Adverse (-60)
  - Exceptions: -5 per exception (cap at -40)
  - CUECs: -2 per CUEC (cap at -20)
  - Subservice Orgs: -5 per org (cap at -15)
  - Report Age: -1 per month over 12 months old (cap at -10)

**Full Script:** `risk_score_calculator.py`

In [None]:
import csv
from datetime import datetime
import re

def calculate_risk_score(vendor_data):
    """
    Calculates risk score (0-100) based on SOC 2 findings.
    Lower score = higher risk.
    
    Args:
        vendor_data (dict): Data from SOC 2 processing
    
    Returns:
        dict: Score, breakdown, risk level
    """
    score = 100
    deductions = []
    
    # 1. Opinion Type
    opinion = vendor_data.get('Opinion Type', '').lower()
    if 'qualified' in opinion:
        score -= 30
        deductions.append("Qualified opinion: -30")
    elif 'adverse' in opinion:
        score -= 60
        deductions.append("Adverse opinion: -60")
    elif 'disclaimer' in opinion:
        score -= 50
        deductions.append("Disclaimer opinion: -50")
    
    # 2. Exception Count
    try:
        exception_count = int(vendor_data.get('Exception Count', 0))
        exception_deduction = min(exception_count * 5, 40)  # Cap at -40
        if exception_deduction > 0:
            score -= exception_deduction
            deductions.append(f"{exception_count} exceptions: -{exception_deduction}")
    except (ValueError, TypeError):
        pass
    
    # 3. CUEC Count
    try:
        cuec_count = int(vendor_data.get('CUEC Count', 0))
        cuec_deduction = min(cuec_count * 2, 20)  # Cap at -20
        if cuec_deduction > 0:
            score -= cuec_deduction
            deductions.append(f"{cuec_count} CUECs: -{cuec_deduction}")
    except (ValueError, TypeError):
        pass
    
    # 4. Subservice Organizations
    subservice_text = vendor_data.get('Subservice Organizations', '')
    if subservice_text and "No subservice" not in subservice_text:
        # Count number of orgs (comma-separated)
        subservice_count = len([s.strip() for s in subservice_text.split(',') if s.strip()])
        subservice_deduction = min(subservice_count * 5, 15)  # Cap at -15
        if subservice_deduction > 0:
            score -= subservice_deduction
            deductions.append(f"{subservice_count} subservice orgs: -{subservice_deduction}")
    
    # 5. Report Age
    try:
        period_length = int(vendor_data.get('Period Length (Days)', 0))
        if period_length > 365:
            months_over = (period_length - 365) // 30
            age_deduction = min(months_over, 10)  # Cap at -10
            if age_deduction > 0:
                score -= age_deduction
                deductions.append(f"Report {months_over} months old: -{age_deduction}")
    except (ValueError, TypeError):
        pass
    
    # Determine risk level
    if score >= 80:
        risk_level = "Low"
    elif score >= 60:
        risk_level = "Medium"
    elif score >= 40:
        risk_level = "High"
    else:
        risk_level = "Critical"
    
    return {
        "score": max(score, 0),  # Don't go below 0
        "risk_level": risk_level,
        "deductions": deductions
    }

def add_risk_scores_to_csv(input_csv, output_csv):
    """
    Reads SOC 2 data CSV, calculates risk scores, writes enhanced CSV.
    """
    print("Calculating risk scores...\n")
    
    with open(input_csv, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        data = list(reader)
    
    # Calculate scores
    for row in data:
        risk_data = calculate_risk_score(row)
        row['Risk Score'] = risk_data['score']
        row['Risk Level'] = risk_data['risk_level']
        row['Score Breakdown'] = '; '.join(risk_data['deductions']) if risk_data['deductions'] else 'No deductions'
    
    # Sort by risk score (lowest first = highest risk)
    data.sort(key=lambda x: float(x.get('Risk Score', 100)))
    
    # Write enhanced CSV
    if data:
        fieldnames = list(data[0].keys())
        
        with open(output_csv, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data)
    
    print(f"✓ Risk scores added to {output_csv}\n")
    
    # Print summary
    print("="*60)
    print("RISK SCORE SUMMARY")
    print("="*60)
    
    risk_levels = {'Low': 0, 'Medium': 0, 'High': 0, 'Critical': 0}
    for row in data:
        level = row.get('Risk Level', 'Unknown')
        if level in risk_levels:
            risk_levels[level] += 1
    
    total = len(data)
    print(f"Total vendors: {total}\n")
    for level, count in risk_levels.items():
        percentage = (count / total * 100) if total > 0 else 0
        print(f"{level:<12} {count:>4} ({percentage:>5.1f}%)")
    
    print("\n" + "="*60)
    print("TOP 10 HIGHEST RISK VENDORS")
    print("="*60)
    for i, row in enumerate(data[:10], 1):
        vendor = row.get('Vendor Name', 'Unknown')
        score = row.get('Risk Score', 'N/A')
        level = row.get('Risk Level', 'N/A')
        print(f"{i:>2}. {vendor:<35} Score: {score:>3} ({level})")

# USAGE
if __name__ == "__main__":
    input_csv = "/path/to/batch_output.csv"
    output_csv = "/path/to/batch_output_with_scores.csv"
    
    add_risk_scores_to_csv(input_csv, output_csv)

**Activities:**
1. Run risk scorer on your batch output
2. Review score distribution - does it match your intuition?
3. Adjust scoring weights based on your organization's risk appetite
4. Add additional factors relevant to your context
5. Re-upload scored data to Google Sheets
6. Update dashboard to show risk score distribution

**Deliverable:** Risk-scored vendor dataset with methodology documentation

**Time:** 3-4 hours

**Validation:**
- Pick 5 vendors you know well
- Compare their calculated scores to your manual assessment
- Adjust weights if scores don't align with reality
- Document your reasoning for weight choices

**Why This Matters:** Stakeholders love numbers. "Vendor X has a risk score of 45" is more actionable than "Vendor X has some concerns."

---

## Phase 5: Production & Communication (Weeks 11-12)

**Focus:** Make it sustainable and presentable

### Week 11: Production Runbook & Documentation

**Objective:** Create documentation so you (or others) can run this process repeatedly

**Runbook Contents:**

```markdown
# SOC 2 Document Analysis - Production Runbook

## Overview
Automated pipeline to extract key data from SOC 2 Type 2 reports and generate risk-scored vendor dashboard.

**Time Required:** 30-45 minutes (vs 3+ hours manual)
**Frequency:** Quarterly or as new reports received

## Prerequisites
- Python 3.8+ installed
- Required libraries: PyPDF2, gspread, oauth2client
- Google Sheets API credentials (credentials.json)
- SOC 2 PDF reports stored in designated folder

## Step-by-Step Process

### 1. Prepare Input Files (5 min)
- Place new SOC 2 PDFs in: /path/to/soc2_reports/
- Naming convention: VendorName_YYYY_SOC2.pdf
- Verify no password-protected files

### 2. Run Batch Processor (15-20 min)
```bash
python batch_soc2_processor.py
```
- Monitor output for errors
- Check log file: soc2_batch_processing.log
- Review CSV output: batch_output.csv

### 3. Run Exception Analyzer (3 min)
```bash
python exception_analyzer.py
```
- Outputs: exception_analysis.csv

### 4. Calculate Risk Scores (2 min)
```bash
python risk_score_calculator.py
```
- Outputs: batch_output_with_scores.csv

### 5. Upload to Google Sheets (5 min)
```bash
python upload_to_gsheets.py
```
- Verify data uploaded correctly
- Check dashboard formulas still working

### 6. Quality Review (5-10 min)
- Spot-check 3-5 high-risk vendors
- Verify exception categorization makes sense
- Confirm risk scores align with expectations
- Note any systematic errors for script refinement

## Troubleshooting

**Error: "PDF extraction failed"**
- Check if PDF is corrupted (try opening manually)
- Verify file permissions
- Try converting to text-based PDF if scanned image

**Error: "Google Sheets authentication failed"**
- Verify credentials.json is in correct location
- Check service account has edit access to sheet
- Re-download credentials if needed

**Inaccurate Extraction**
- Identify auditor/format causing issues
- Update regex patterns in soc2_to_spreadsheet.py
- Add auditor-specific handling if needed

## Maintenance

**Monthly:**
- Review error logs
- Update regex patterns as new auditor formats emerge

**Quarterly:**
- Validate risk scoring weights still appropriate
- Refine exception categorization keywords

**Annually:**
- Review SOC 2 reporting standards for changes
- Update extraction logic if TSC criteria change

## Contact
Questions or issues: [Your Name] ([your email])
```

**Activities:**
1. Create runbook document
2. Test by following runbook yourself start-to-finish
3. Have a colleague attempt to run it using only the runbook
4. Incorporate their feedback
5. Add to your team's knowledge base

**Deliverable:** Production-ready runbook

**Time:** 2-3 hours

**Why This Matters:** Six months from now when you've forgotten the details, this runbook will save you.

### Week 12: Executive Presentation

**Objective:** Create stakeholder-facing presentation demonstrating value

**Presentation Outline:**

**Slide 1: Title**
- "Automated SOC 2 Risk Analysis"
- "Reducing Vendor Review Time by 75%"

**Slide 2: The Problem**
- Manual SOC 2 reviews took 3+ hours per vendor
- 500+ vendors = 1,500+ hours annually
- Inconsistent analysis across reviewers
- No trend tracking or portfolio-level insights

**Slide 3: The Solution**
- Automated document parsing and data extraction
- Standardized risk scoring methodology
- Real-time dashboard for leadership visibility

**Slide 4: Results**
- Review time: 3 hours → 30-45 minutes (75% reduction)
- Annual time savings: 1,125 hours
- 100% consistency in data extraction
- Portfolio risk visibility in real-time

**Slide 5: Key Insights Unlocked**
- Top exception categories across vendor portfolio
- Risk score distribution and trends
- Identification of highest-risk vendors
- Systematic analysis not possible with manual review

**Slide 6: Dashboard Demo**
- Screenshot of Google Sheets dashboard
- Highlight key metrics and visualizations

**Slide 7: ROI**
- Time saved: 1,125 hours annually
- Cost savings: [Your hourly rate × 1,125 hours]
- Quality improvement: Consistent, data-driven decisions
- Scalability: Can handle growing vendor portfolio

**Slide 8: Next Steps**
- Expand to other document types (ISO 27001, PCI DSS)
- Integrate with GRC platform
- Build predictive risk modeling

**Activities:**
1. Create presentation deck
2. Prepare live demo of dashboard
3. Calculate specific ROI numbers for your organization
4. Practice 10-minute version and 30-minute version
5. Anticipate questions and prepare answers

**Deliverable:** Board-ready presentation with demo

**Time:** 3-4 hours

**Presentation Tips:**
- Lead with business value, not technical details
- Use before/after comparisons
- Show actual vendor data (anonymize if needed)
- Be ready to explain "what could go wrong"

**Why This Matters:** This presentation gets you promoted. You've demonstrated technical capability, business value, and strategic thinking.

---

## Lab Complete! 🎉

### What You've Built:
✅ Complete SOC 2 document parsing pipeline
✅ Automated batch processing for 500+ vendors
✅ Exception pattern analysis framework
✅ Risk scoring methodology
✅ Executive dashboard with real-time data
✅ Production documentation and runbook
✅ Board-ready presentation

### Skills Developed:
- Python PDF processing
- Regular expressions for text extraction
- CSV data manipulation
- Google Sheets API integration
- Batch file processing
- Risk scoring methodology design
- Technical documentation
- Executive communication

### Measurable Impact:
- **Time Savings:** 75% reduction (3 hours → 30-45 minutes)
- **Consistency:** 100% standardized extraction
- **Scalability:** 500+ vendors processed in single run
- **Visibility:** Real-time portfolio risk dashboard
- **Analysis Depth:** Pattern recognition impossible with manual review

### What's Next?

**Short Term (Next Month):**
- Run on your full vendor portfolio
- Present to leadership
- Gather feedback and refine

**Medium Term (Next Quarter):**
- Expand to ISO 27001, PCI DSS documents
- Integrate with your GRC platform
- Train team members on the system

**Long Term (Next Year):**
- Build predictive risk modeling
- Automate remediation workflows
- Publish your methodology (conference talk, blog post)

### Questions or Issues?
Return to any week and refine as needed. This is your system - adapt it to your evolving needs.

**Congratulations on completing the lab! 🚀**