In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.cortex import complete
import docx
from datetime import datetime
import os
import re

# Setup Snowflake session
def create_snowflake_session():
    connection_parameters = {
        "account": "IW69072",
        "user": "AnalyticsInnovators",
        "password": "AnalyticsInnovators@123",
        "role": "ACCOUNTADMIN",
        "warehouse": "COMPUTE_WH",
        "database": "SOP_COMPLIANCE",
        "schema": "PUBLIC"
    }
    return Session.builder.configs(connection_parameters).create()

# Download files from internal stage to temp and return paths
def download_files(session):
    temp_dir = "temp_stage_files"
    os.makedirs(temp_dir, exist_ok=True)

    doc_stage_path = "@my_stage/Synthetic_SOP_Compliant.docx"
    prompt_stage_path = "@my_stage/gxP_prompts.txt"
    score_prompt_stage_path = "@my_stage/score_prompts.txt"

    doc_temp_path = os.path.join(temp_dir, "Synthetic_SOP_Compliant.docx")
    prompt_temp_path = os.path.join(temp_dir, "gxP_prompts.txt")
    score_prompt_temp_path = os.path.join(temp_dir, "score_prompts.txt")

    session.file.get(doc_stage_path, temp_dir)
    session.file.get(prompt_stage_path, temp_dir)
    session.file.get(score_prompt_stage_path, temp_dir)

    return doc_temp_path, prompt_temp_path, score_prompt_temp_path

# Parse SOP document into sections
def parse_document(doc_path):
    doc = docx.Document(doc_path)
    sections = {}
    current_section = "Header"
    sections[current_section] = []

    for para in doc.paragraphs:
        text = para.text.strip()
        if not text:
            continue
        if text.startswith("##") or text in [
            "Revision History", "Introduction", "Purpose", "Scope",
            "Responsibilities", "Definitions", "Procedure", "References", "Approvals"
        ]:
            current_section = text.strip()
            sections[current_section] = []
        else:
            sections.setdefault(current_section, []).append(text)

    return {k: "\n".join(v) for k, v in sections.items()}

# Load prompts from file with section headers
def load_prompts(prompt_path):
    with open(prompt_path, "r", encoding="utf-8") as f:
        content = f.read()
    pattern = r"([\w\s]+)\s*\(\s*(.*?)\s*\)"
    matches = re.findall(pattern, content, re.DOTALL)
    return {section.strip(): prompt.strip() for section, prompt in matches}

# Analyze document using Cortex
def analyze_document(session, doc_name, prompts, score_prompts, sections):
    results = []
    timestamp = datetime.utcnow().isoformat()

    for section, analysis_prompt in prompts.items():
        section_text = sections.get(section, "")
        full_analysis_prompt = f"Section Content:\n{section_text}\n\nPrompt:\n{analysis_prompt}"

        ai_response = complete(
            model="claude-3-5-sonnet",
            prompt=full_analysis_prompt,
            session=session
        )

        score_prompt = score_prompts.get(section, "")
        full_score_prompt = f"AI Response:\n{ai_response}\n\nBased on the following scoring rules, assign severity and score:\n{score_prompt}"

        score_response = complete(
            model="claude-3-5-sonnet",
            prompt=full_score_prompt,
            session=session
        )

        severity_match = re.search(r"Severity\s*[:\-]?\s*(Critical|Major|Minor)", score_response, re.IGNORECASE)
        score_match = re.search(r"Score\s*[:\-]?\s*(\d+)", score_response)

        severity = severity_match.group(1).capitalize() if severity_match else "Minor"
        score = int(score_match.group(1)) if score_match else 1

        results.append((doc_name, section, analysis_prompt, severity, score, ai_response, timestamp))

    return results

# Save results to Snowflake
def save_results(session, results):
    df = session.create_dataframe(results, schema=[
        "document_name", "section", "issue", "severity", "score", "ai_response", "timestamp"
    ])
    df.write.mode("append").save_as_table("sop_compliance_results")

# Calculate and print compliance score
def print_compliance_score(results):
    total_score = sum([r[4] for r in results])
    max_score = len(results) * 3
    compliance_percent = 100 - int((total_score / max_score) * 100)
    print(f"Compliance Score: {compliance_percent}/100")

# Main execution
def main():
    session = create_snowflake_session()
    doc_path, prompt_path, score_prompt_path = download_files(session)

    doc_name = "Synthetic_SOP_Compliant.docx"
    existing = session.table("sop_compliance_results").filter(col("document_name") == doc_name)

    if existing.count() > 0:
        print("Document already analyzed. Replacing old results...")
        session.sql(f"""
            DELETE FROM sop_compliance_results
            WHERE document_name = '{doc_name}'
        """).collect()

    sections = parse_document(doc_path)
    prompts = load_prompts(prompt_path)
    score_prompts = load_prompts(score_prompt_path)
    results = analyze_document(session, doc_name, prompts, score_prompts, sections)
    save_results(session, results)
    print("Results inserted successfully.")
    print_compliance_score(results)

# Run the script
main()


In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
import pandas as pd

# Create Snowflake session
connection_parameters = {
    "account": "IW69072",
    "user": "AnalyticsInnovators",
    "password": "AnalyticsInnovators@123",
    "role": "ACCOUNTADMIN",
    "warehouse": "COMPUTE_WH",
    "database": "SOP_COMPLIANCE",
    "schema": "PUBLIC"
}
session = Session.builder.configs(connection_parameters).create()

# Load data from table
df = session.table("sop_compliance_results").to_pandas()

# If no data, exit
if df.empty:
    print("No data available in sop_compliance_results table.")
else:
    # Select latest document
    latest_doc = df.sort_values("TIMESTAMP", ascending=False).iloc[0]["DOCUMENT_NAME"]
    doc_df = df[df["DOCUMENT_NAME"] == latest_doc]
    timestamp = doc_df["TIMESTAMP"].max()

    # Display header
    print(f"📄 Document: {latest_doc}")
    print(f"🕒 Last Analyzed: {timestamp}")

    # KPI: Compliance Score
    total_score = doc_df["SCORE"].sum()
    max_score = len(doc_df) * 3
    compliance_score = 100 - int((total_score / max_score) * 100)
    print(f"\n✅ Compliance Score: {100-compliance_score}/100\n")

    # Section-wise breakdown
    print("📌 Section Analysis:\n")
    for _, row in doc_df.iterrows():
        print(f"🔍 Section: {row['SECTION']}")
        print(f"Score: {row['SCORE']} | Severity: {row['SEVERITY']}")
        print(f"AI Response:\n{row['AI_RESPONSE']}\n")
        print("-" * 80)
