# Certificate Revocation Dashboard

This dashboard displays certificate issuance and revocation activity from the lab.

## Log Files
- **Revocation Log**: Records all certificate revocations
- **Issuance Log**: Records all certificate issuances

In [None]:
import os
import re
import time
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
from IPython.display import display, clear_output, HTML
import ipywidgets as widgets

In [None]:
# Log file paths
LOG_DIR = Path('/home/jovyan/logs')
REVOCATION_LOG = LOG_DIR / 'dogtag-revocation.log'
ISSUANCE_LOG = LOG_DIR / 'dogtag-issuance.log'

print(f"Log Directory: {LOG_DIR}")
print(f"Directory exists: {LOG_DIR.exists()}")
if LOG_DIR.exists():
    print(f"Contents: {list(LOG_DIR.iterdir())}")

## Log Parsing Functions

In [None]:
def parse_log_line(line):
    """Parse a log line in the format: timestamp | KEY=VALUE | KEY=VALUE ..."""
    parts = line.strip().split(' | ')
    if len(parts) < 2:
        return None
    
    record = {'timestamp': parts[0]}
    for part in parts[1:]:
        if '=' in part:
            key, value = part.split('=', 1)
            record[key] = value
    return record

def read_log_file(log_path):
    """Read and parse a log file into a DataFrame."""
    if not log_path.exists():
        return pd.DataFrame()
    
    records = []
    with open(log_path, 'r') as f:
        for line in f:
            record = parse_log_line(line)
            if record:
                records.append(record)
    
    return pd.DataFrame(records)

def format_status(status):
    """Return HTML-formatted status with color."""
    colors = {
        'REVOKED': '#dc3545',  # red
        'ISSUED': '#28a745',   # green
        'FAILED': '#ffc107',   # yellow
    }
    color = colors.get(status, '#6c757d')
    return f'<span style="color: {color}; font-weight: bold;">{status}</span>'

## Revocation Log

In [None]:
def display_revocations():
    df = read_log_file(REVOCATION_LOG)
    if df.empty:
        print("No revocation records found.")
        print(f"Log file: {REVOCATION_LOG}")
        return
    
    # Select and order columns
    columns = ['timestamp', 'PKI', 'CA', 'SERIAL', 'CN', 'STATUS', 'REASON', 'EVENT']
    available = [c for c in columns if c in df.columns]
    df_display = df[available].copy()
    
    # Sort by timestamp descending
    df_display = df_display.sort_values('timestamp', ascending=False)
    
    print(f"Total Revocations: {len(df_display)}")
    display(df_display.head(20))

display_revocations()

## Issuance Log

In [None]:
def display_issuances():
    df = read_log_file(ISSUANCE_LOG)
    if df.empty:
        print("No issuance records found.")
        print(f"Log file: {ISSUANCE_LOG}")
        return
    
    columns = ['timestamp', 'PKI', 'CA', 'SERIAL', 'CN', 'STATUS', 'EVENT']
    available = [c for c in columns if c in df.columns]
    df_display = df[available].copy()
    df_display = df_display.sort_values('timestamp', ascending=False)
    
    print(f"Total Issuances: {len(df_display)}")
    display(df_display.head(20))

display_issuances()

## Summary Statistics

In [None]:
def display_summary():
    revocations = read_log_file(REVOCATION_LOG)
    issuances = read_log_file(ISSUANCE_LOG)
    
    print("=" * 60)
    print("CERTIFICATE ACTIVITY SUMMARY")
    print("=" * 60)
    
    print(f"\nTotal Certificates Issued:  {len(issuances)}")
    print(f"Total Certificates Revoked: {len(revocations)}")
    
    if not revocations.empty and 'PKI' in revocations.columns:
        print("\nRevocations by PKI Type:")
        print(revocations['PKI'].value_counts().to_string())
    
    if not revocations.empty and 'CA' in revocations.columns:
        print("\nRevocations by CA:")
        print(revocations['CA'].value_counts().to_string())
    
    if not revocations.empty and 'STATUS' in revocations.columns:
        print("\nRevocation Status:")
        print(revocations['STATUS'].value_counts().to_string())

display_summary()

## Live Dashboard

Auto-refreshing dashboard that polls logs every few seconds.

In [None]:
def live_dashboard(refresh_interval=5, duration=120):
    """
    Display a live dashboard that auto-refreshes.
    
    Args:
        refresh_interval: Seconds between refreshes
        duration: Total duration to run (seconds)
    """
    start_time = time.time()
    
    while time.time() - start_time < duration:
        clear_output(wait=True)
        
        now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(f"Certificate Revocation Lab Dashboard - {now}")
        print(f"Auto-refresh every {refresh_interval}s (running for {duration}s)")
        print("=" * 70)
        
        # Read logs
        revocations = read_log_file(REVOCATION_LOG)
        issuances = read_log_file(ISSUANCE_LOG)
        
        # Summary
        print(f"\nðŸ“Š SUMMARY")
        print(f"   Certificates Issued:  {len(issuances)}")
        print(f"   Certificates Revoked: {len(revocations)}")
        
        # Recent Activity
        print(f"\nðŸ“‹ RECENT REVOCATIONS (last 5)")
        print("-" * 70)
        if not revocations.empty:
            recent = revocations.sort_values('timestamp', ascending=False).head(5)
            for _, row in recent.iterrows():
                ts = row.get('timestamp', 'N/A')[:19]
                pki = row.get('PKI', 'N/A')
                ca = row.get('CA', 'N/A')
                serial = row.get('SERIAL', 'N/A')[:16]
                cn = row.get('CN', 'N/A')[:30]
                status = row.get('STATUS', 'N/A')
                print(f"   {ts} | {pki:8} | {ca:12} | {serial:16} | {status}")
        else:
            print("   No revocations recorded yet.")
        
        print(f"\nðŸ“‹ RECENT ISSUANCES (last 5)")
        print("-" * 70)
        if not issuances.empty:
            recent = issuances.sort_values('timestamp', ascending=False).head(5)
            for _, row in recent.iterrows():
                ts = row.get('timestamp', 'N/A')[:19]
                pki = row.get('PKI', 'N/A')
                ca = row.get('CA', 'N/A')
                serial = row.get('SERIAL', 'N/A')[:16]
                status = row.get('STATUS', 'N/A')
                print(f"   {ts} | {pki:8} | {ca:12} | {serial:16} | {status}")
        else:
            print("   No issuances recorded yet.")
        
        print("\n" + "=" * 70)
        elapsed = int(time.time() - start_time)
        remaining = duration - elapsed
        print(f"Remaining: {remaining}s | Press interrupt (â¬›) to stop")
        
        time.sleep(refresh_interval)
    
    print("\nDashboard stopped.")

# Run dashboard for 2 minutes
live_dashboard(refresh_interval=5, duration=120)

## Certificate Lookup

Look up a certificate by serial number or CN.

In [None]:
def lookup_certificate(search_term):
    """Search for a certificate in logs by serial or CN."""
    revocations = read_log_file(REVOCATION_LOG)
    issuances = read_log_file(ISSUANCE_LOG)
    
    results = []
    
    for df, log_type in [(issuances, 'ISSUED'), (revocations, 'REVOKED')]:
        if df.empty:
            continue
        
        # Search in SERIAL and CN columns
        mask = pd.Series([False] * len(df))
        if 'SERIAL' in df.columns:
            mask |= df['SERIAL'].str.contains(search_term, case=False, na=False)
        if 'CN' in df.columns:
            mask |= df['CN'].str.contains(search_term, case=False, na=False)
        
        matches = df[mask].copy()
        if not matches.empty:
            matches['ACTION'] = log_type
            results.append(matches)
    
    if results:
        combined = pd.concat(results, ignore_index=True)
        combined = combined.sort_values('timestamp')
        return combined
    else:
        return pd.DataFrame()

In [None]:
# Example: Search for a certificate
# Change this to search for a specific certificate
SEARCH_TERM = "testdevice"

results = lookup_certificate(SEARCH_TERM)
if not results.empty:
    print(f"Found {len(results)} records matching '{SEARCH_TERM}':")
    display(results)
else:
    print(f"No records found matching '{SEARCH_TERM}'")