In [1]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import os

BASE_DIR = "/content/drive/MyDrive/AI PolicyGuard"

folders = [
    "policies",
    "data",
    "engine",
    "reports"
]

for folder in folders:
    os.makedirs(os.path.join(BASE_DIR, folder), exist_ok=True)

BASE_DIR


'/content/drive/MyDrive/AI PolicyGuard'

In [3]:
import json

policy_rules = {
    "policies": [
        {
            "policy_id": "POL-001",
            "name": "Unauthorized Data Access",
            "condition": {
                "action": "access_data",
                "data_sensitivity": "confidential",
                "authorized": False
            },
            "risk": "HIGH",
            "description": "Accessing confidential data without authorization"
        },
        {
            "policy_id": "POL-002",
            "name": "Excessive Login Attempts",
            "condition": {
                "action": "login",
                "attempts_gt": 5
            },
            "risk": "MEDIUM",
            "description": "Multiple failed login attempts detected"
        },
        {
            "policy_id": "POL-003",
            "name": "After-Hours System Access",
            "condition": {
                "action": "system_access",
                "outside_business_hours": True
            },
            "risk": "LOW",
            "description": "System accessed outside approved working hours"
        }
    ]
}

policy_path = f"{BASE_DIR}/policies/policy_rules.json"
with open(policy_path, "w") as f:
    json.dump(policy_rules, f, indent=4)

policy_path


'/content/drive/MyDrive/AI PolicyGuard/policies/policy_rules.json'

In [4]:
simulated_actions = {
    "actions": [
        {
            "user_id": "EMP102",
            "action": "access_data",
            "data_sensitivity": "confidential",
            "authorized": False,
            "timestamp": "2025-01-12 22:14"
        },
        {
            "user_id": "EMP205",
            "action": "login",
            "failed_attempts": 7,
            "timestamp": "2025-01-12 10:30"
        },
        {
            "user_id": "EMP311",
            "action": "system_access",
            "outside_business_hours": True,
            "timestamp": "2025-01-12 23:50"
        }
    ]
}

actions_path = f"{BASE_DIR}/data/simulated_actions.json"
with open(actions_path, "w") as f:
    json.dump(simulated_actions, f, indent=4)

actions_path


'/content/drive/MyDrive/AI PolicyGuard/data/simulated_actions.json'

In [5]:
policy_engine_code = """
def detect_violations(actions, policies):
    violations = []

    for act in actions:
        for pol in policies:
            condition = pol["condition"]
            matched = True

            for key, value in condition.items():
                if key.endswith("_gt"):
                    real_key = key.replace("_gt", "")
                    if act.get(real_key, 0) <= value:
                        matched = False
                else:
                    if act.get(key) != value:
                        matched = False

            if matched:
                violations.append({
                    "user_id": act["user_id"],
                    "policy_id": pol["policy_id"],
                    "policy_name": pol["name"],
                    "risk": pol["risk"],
                    "description": pol["description"],
                    "timestamp": act["timestamp"]
                })

    return violations
"""

with open(f"{BASE_DIR}/engine/policy_engine.py", "w") as f:
    f.write(policy_engine_code)


In [6]:
risk_engine_code = """
def risk_score(level):
    mapping = {
        "LOW": 30,
        "MEDIUM": 60,
        "HIGH": 90
    }
    return mapping.get(level, 0)
"""

with open(f"{BASE_DIR}/engine/risk_engine.py", "w") as f:
    f.write(risk_engine_code)


In [7]:
explanation_engine_code = """
def explain_violation(v):
    return (
        f"User {v['user_id']} violated policy {v['policy_id']} "
        f"({v['policy_name']}) because: {v['description']}."
    )
"""

with open(f"{BASE_DIR}/engine/explanation_engine.py", "w") as f:
    f.write(explanation_engine_code)


In [8]:
import sys
sys.path.append(f"{BASE_DIR}/engine")

from policy_engine import detect_violations
from risk_engine import risk_score
from explanation_engine import explain_violation

with open(policy_path) as f:
    policies = json.load(f)["policies"]

with open(actions_path) as f:
    actions = json.load(f)["actions"]

violations = detect_violations(actions, policies)

final_report = []
for v in violations:
    v["risk_score"] = risk_score(v["risk"])
    v["explanation"] = explain_violation(v)
    final_report.append(v)

final_report


[{'user_id': 'EMP102',
  'policy_id': 'POL-001',
  'policy_name': 'Unauthorized Data Access',
  'risk': 'HIGH',
  'description': 'Accessing confidential data without authorization',
  'timestamp': '2025-01-12 22:14',
  'risk_score': 90,
  'explanation': 'User EMP102 violated policy POL-001 (Unauthorized Data Access) because: Accessing confidential data without authorization.'},
 {'user_id': 'EMP311',
  'policy_id': 'POL-003',
  'policy_name': 'After-Hours System Access',
  'risk': 'LOW',
  'description': 'System accessed outside approved working hours',
  'timestamp': '2025-01-12 23:50',
  'risk_score': 30,
  'explanation': 'User EMP311 violated policy POL-003 (After-Hours System Access) because: System accessed outside approved working hours.'}]

In [9]:
report_path = f"{BASE_DIR}/reports/sample_violation_report.json"

with open(report_path, "w") as f:
    json.dump(final_report, f, indent=4)

report_path


'/content/drive/MyDrive/AI PolicyGuard/reports/sample_violation_report.json'

In [10]:
!pip install streamlit pyngrok




In [11]:
app_code = """
import json
import sys
import streamlit as st

BASE_DIR = "."

sys.path.append(f"{BASE_DIR}/engine")

from policy_engine import detect_violations
from risk_engine import risk_score
from explanation_engine import explain_violation

st.set_page_config(page_title="PolicyGuard", layout="wide")

st.title("üõ°Ô∏è PolicyGuard ‚Äì AI Policy Violation & Risk Analyzer")
st.caption("Enterprise Governance & Compliance Intelligence System")

# Load data
with open("policies/policy_rules.json") as f:
    policies = json.load(f)["policies"]

with open("data/simulated_actions.json") as f:
    actions = json.load(f)["actions"]

# Run analysis
violations = detect_violations(actions, policies)

final_report = []
for v in violations:
    v["risk_score"] = risk_score(v["risk"])
    v["explanation"] = explain_violation(v)
    final_report.append(v)

# Metrics
col1, col2, col3 = st.columns(3)
col1.metric("Total Policies", len(policies))
col2.metric("Total Actions", len(actions))
col3.metric("Violations Detected", len(final_report))

st.divider()

st.subheader("üö® Detected Policy Violations")

if final_report:
    for v in final_report:
        with st.expander(f"{v['policy_id']} | {v['policy_name']} | Risk: {v['risk']}"):
            st.write("üë§ **User:**", v["user_id"])
            st.write("üïí **Timestamp:**", v["timestamp"])
            st.write("‚ö†Ô∏è **Risk Score:**", v["risk_score"])
            st.write("üß† **Explanation:**")
            st.info(v["explanation"])
else:
    st.success("No policy violations detected.")
"""

with open("/content/drive/MyDrive/AI PolicyGuard/app.py", "w") as f:
    f.write(app_code)


In [12]:
!pip install -q streamlit


In [13]:
!streamlit run "/content/drive/MyDrive/AI PolicyGuard/app.py" \
  --server.enableCORS false \
  --server.enableXsrfProtection false



Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8501[0m
[34m  External URL: [0m[1mhttp://35.221.137.14:8501[0m
[0m
[34m  Stopping...[0m
Traceback (most recent call last):
  File "/usr/local/bin/streamlit", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/click/core.py", line 1485, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/click/core.py", line 1406, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/click/core.py", line 1873, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  Fil

In [14]:
from google.colab import output

output.serve_kernel_port_as_window(
    port=8501,
    path="/",
    window_name="PolicyGuard Web App"
)


TypeError: serve_kernel_port_as_window() got an unexpected keyword argument 'window_name'

In [None]:
from google.colab import output
print(
    output.eval_js(
        "google.colab.kernel.proxyPort(8501, {'cache': true})"
    )
)
