# 04 - VSM Mapping: Viable System Model Analysis

Map OSS project metrics to Stafford Beer's Viable System Model (VSM) subsystems.

**VSM Subsystems:**
- **S1 (Operations)**: Primary activities - code contributions, commits, releases
- **S2 (Coordination)**: Anti-oscillatory mechanisms - CI/CD, coding standards, PR reviews
- **S3 (Control)**: Resource allocation - maintainer decisions, issue triage
- **S4 (Intelligence)**: Environmental scanning - roadmaps, community feedback, security alerts
- **S5 (Policy)**: Identity and governance - GOVERNANCE.md, CODE_OF_CONDUCT, core values

**Research Questions:**
- How do Stadium projects differ from Federation/Club in VSM structure?
- Which subsystems are under-developed in vulnerable projects?
- Can VSM analysis predict project sustainability?

## Setup

In [None]:
import json
import sys
from pathlib import Path
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Circle, FancyBboxPatch
import matplotlib.patches as mpatches

# Add src to path
sys.path.insert(0, '../src')
from analysis.entropy_calculation import EntropyCalculator

# Set plotting style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['font.size'] = 11

print("✅ Setup complete!")

## 1. Load Project Data

In [None]:
# Load all collected project data
data_dir = Path("../data/raw")
projects = {}

for file_path in data_dir.glob("*_data.json"):
    with open(file_path, 'r') as f:
        data = json.load(f)
        repo_name = data['repository']['full_name']
        projects[repo_name] = data

print(f"Loaded {len(projects)} projects:")
for name in projects:
    print(f"  - {name}")

## 2. Define VSM Metric Mapping

Map GitHub metrics to VSM subsystems.

In [None]:
class VSMMapper:
    """
    Map OSS project metrics to Viable System Model subsystems.
    """
    
    def __init__(self):
        self.entropy_calc = EntropyCalculator()
    
    def calculate_s1_operations(self, project_data: dict) -> dict:
        """
        S1 (Operations): Primary productive activities.
        
        Metrics:
        - Commit frequency and volume
        - Contributor activity
        - Release cadence
        """
        commits = project_data.get('recent_commits', [])
        contributors = project_data.get('contributors', [])
        repo = project_data.get('repository', {})
        
        # Commit activity
        commit_count = len(commits)
        
        # Unique committers in recent period
        committers = set()
        for c in commits:
            if c.get('author'):
                committers.add(c['author'])
        
        # Contributor entropy (distribution of work)
        entropy, normalized = self.entropy_calc.contributor_entropy(contributors)
        
        # Code changes volume
        total_additions = sum(c.get('additions', 0) for c in commits)
        total_deletions = sum(c.get('deletions', 0) for c in commits)
        
        return {
            'commit_count': commit_count,
            'unique_committers': len(committers),
            'total_contributors': len(contributors),
            'contributor_entropy': normalized,
            'code_additions': total_additions,
            'code_deletions': total_deletions,
            'code_churn': total_additions + total_deletions,
            # S1 Health Score (0-100)
            's1_score': min(100, (commit_count / 365 * 50) + (len(committers) * 5))
        }
    
    def calculate_s2_coordination(self, project_data: dict) -> dict:
        """
        S2 (Coordination): Anti-oscillatory mechanisms.
        
        Metrics:
        - PR review process
        - CI/CD presence
        - Coding standards enforcement
        """
        prs = project_data.get('pull_requests', {}).get('pull_requests', [])
        pr_stats = project_data.get('pull_requests', {}).get('statistics', {})
        governance = project_data.get('governance_files', {})
        
        # PR review metrics
        reviewed_prs = sum(1 for pr in prs if pr.get('review_count', 0) > 0)
        review_rate = reviewed_prs / len(prs) if prs else 0
        
        # Merge time consistency (lower is better coordination)
        avg_merge_time = pr_stats.get('avg_time_to_merge', 0)
        
        # Standards presence
        has_contributing = governance.get('CONTRIBUTING.md', False)
        has_codeowners = governance.get('.github/CODEOWNERS', False)
        
        # Calculate coordination score
        coord_score = 0
        coord_score += review_rate * 40  # 40 points for review coverage
        coord_score += 20 if has_contributing else 0
        coord_score += 20 if has_codeowners else 0
        coord_score += 20 if avg_merge_time < 72 else (10 if avg_merge_time < 168 else 0)
        
        return {
            'total_prs': len(prs),
            'reviewed_prs': reviewed_prs,
            'review_rate': review_rate,
            'avg_merge_time_hrs': avg_merge_time,
            'has_contributing_guide': has_contributing,
            'has_codeowners': has_codeowners,
            's2_score': min(100, coord_score)
        }
    
    def calculate_s3_control(self, project_data: dict) -> dict:
        """
        S3 (Control): Internal resource allocation and optimization.
        
        Metrics:
        - Maintainer activity and responsiveness
        - Issue management
        - Decision-making patterns
        """
        maintainers = project_data.get('maintainers', {}).get('statistics', {})
        issues = project_data.get('issues', {}).get('issues', [])
        issue_stats = project_data.get('issues', {}).get('statistics', {})
        
        active_maintainers = maintainers.get('active_maintainers_6mo', 0)
        avg_close_time = issue_stats.get('avg_time_to_close', 0)
        
        # Issue triage (labeled issues)
        labeled_issues = sum(1 for i in issues if i.get('labels'))
        label_rate = labeled_issues / len(issues) if issues else 0
        
        # Control concentration (Gini of contributions)
        contributors = project_data.get('contributors', [])
        contributions = [c.get('contributions', 0) for c in contributors]
        gini = self.entropy_calc.gini_coefficient(contributions)
        
        # Calculate control score
        control_score = 0
        control_score += min(30, active_maintainers * 10)  # Up to 30 for maintainers
        control_score += 30 if avg_close_time < 168 else (15 if avg_close_time < 720 else 0)
        control_score += label_rate * 20
        control_score += (1 - gini) * 20  # Lower Gini = better distributed control
        
        return {
            'active_maintainers': active_maintainers,
            'total_issues': len(issues),
            'avg_close_time_hrs': avg_close_time,
            'label_rate': label_rate,
            'control_concentration': gini,
            's3_score': min(100, control_score)
        }
    
    def calculate_s4_intelligence(self, project_data: dict) -> dict:
        """
        S4 (Intelligence): Environmental scanning and adaptation.
        
        Metrics:
        - External engagement (forks, community)
        - Security responsiveness
        - Ecosystem awareness
        """
        repo = project_data.get('repository', {})
        governance = project_data.get('governance_files', {})
        
        stars = repo.get('stargazers_count', 0)
        forks = repo.get('forks_count', 0)
        watchers = repo.get('watchers_count', 0)
        
        # Fork ratio indicates external adoption/adaptation
        fork_ratio = forks / stars if stars > 0 else 0
        
        # Security awareness
        has_security = governance.get('SECURITY.md', False)
        
        # Calculate intelligence score
        intel_score = 0
        intel_score += min(30, np.log10(stars + 1) * 10)  # Logarithmic star impact
        intel_score += min(20, fork_ratio * 100)  # Fork engagement
        intel_score += 30 if has_security else 0
        intel_score += min(20, np.log10(watchers + 1) * 10)
        
        return {
            'stars': stars,
            'forks': forks,
            'watchers': watchers,
            'fork_ratio': fork_ratio,
            'has_security_policy': has_security,
            's4_score': min(100, intel_score)
        }
    
    def calculate_s5_policy(self, project_data: dict) -> dict:
        """
        S5 (Policy): Identity, values, and ultimate authority.
        
        Metrics:
        - Governance documentation
        - Code of conduct
        - Decision-making transparency
        """
        governance = project_data.get('governance_files', {})
        repo = project_data.get('repository', {})
        
        has_governance = governance.get('GOVERNANCE.md', False)
        has_coc = governance.get('CODE_OF_CONDUCT.md', False)
        has_maintainers = governance.get('MAINTAINERS.md', False) or governance.get('CONTRIBUTORS.md', False)
        has_license = repo.get('license') is not None
        
        # Governance completeness
        gov_files_count = sum([
            has_governance, has_coc, has_maintainers, has_license,
            governance.get('CONTRIBUTING.md', False),
            governance.get('SECURITY.md', False)
        ])
        
        # Calculate policy score
        policy_score = 0
        policy_score += 25 if has_governance else 0
        policy_score += 25 if has_coc else 0
        policy_score += 20 if has_maintainers else 0
        policy_score += 15 if has_license else 0
        policy_score += gov_files_count * 2.5  # Bonus for completeness
        
        return {
            'has_governance': has_governance,
            'has_code_of_conduct': has_coc,
            'has_maintainers_file': has_maintainers,
            'has_license': has_license,
            'governance_completeness': gov_files_count / 6,
            's5_score': min(100, policy_score)
        }
    
    def calculate_full_vsm(self, project_data: dict) -> dict:
        """
        Calculate complete VSM profile for a project.
        """
        s1 = self.calculate_s1_operations(project_data)
        s2 = self.calculate_s2_coordination(project_data)
        s3 = self.calculate_s3_control(project_data)
        s4 = self.calculate_s4_intelligence(project_data)
        s5 = self.calculate_s5_policy(project_data)
        
        # Overall viability score (weighted average)
        viability_score = (
            s1['s1_score'] * 0.25 +
            s2['s2_score'] * 0.20 +
            s3['s3_score'] * 0.25 +
            s4['s4_score'] * 0.15 +
            s5['s5_score'] * 0.15
        )
        
        return {
            'S1_Operations': s1,
            'S2_Coordination': s2,
            'S3_Control': s3,
            'S4_Intelligence': s4,
            'S5_Policy': s5,
            'viability_score': viability_score,
            'subsystem_scores': {
                'S1': s1['s1_score'],
                'S2': s2['s2_score'],
                'S3': s3['s3_score'],
                'S4': s4['s4_score'],
                'S5': s5['s5_score']
            }
        }

# Initialize mapper
vsm_mapper = VSMMapper()
print("✅ VSM Mapper initialized")

## 3. Calculate VSM Profiles

In [None]:
# Calculate VSM profiles for all projects
vsm_profiles = {}

for repo_name, data in projects.items():
    vsm_profiles[repo_name] = vsm_mapper.calculate_full_vsm(data)
    
    print(f"\n{'='*60}")
    print(f"{repo_name}")
    print(f"{'='*60}")
    print(f"Viability Score: {vsm_profiles[repo_name]['viability_score']:.1f}/100")
    print(f"\nSubsystem Scores:")
    for subsystem, score in vsm_profiles[repo_name]['subsystem_scores'].items():
        bar = '█' * int(score / 5) + '░' * (20 - int(score / 5))
        print(f"  {subsystem}: {bar} {score:.1f}")

## 4. VSM Radar Charts

In [None]:
def plot_vsm_radar(vsm_profile: dict, title: str, ax=None):
    """
    Create radar chart for VSM subsystem scores.
    """
    categories = ['S1\nOperations', 'S2\nCoordination', 'S3\nControl', 
                  'S4\nIntelligence', 'S5\nPolicy']
    scores = [vsm_profile['subsystem_scores'][f'S{i}'] for i in range(1, 6)]
    
    # Close the radar chart
    scores += scores[:1]
    angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=False).tolist()
    angles += angles[:1]
    
    if ax is None:
        fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(polar=True))
    
    # Plot data
    ax.plot(angles, scores, 'o-', linewidth=2, color='steelblue')
    ax.fill(angles, scores, alpha=0.25, color='steelblue')
    
    # Set category labels
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(categories, size=10)
    
    # Set y-axis limits
    ax.set_ylim(0, 100)
    ax.set_yticks([20, 40, 60, 80, 100])
    ax.set_yticklabels(['20', '40', '60', '80', '100'], size=8)
    
    ax.set_title(f"{title}\nViability: {vsm_profile['viability_score']:.1f}/100", 
                 size=12, fontweight='bold', pad=20)
    
    return ax

# Create radar charts for all projects
n_projects = len(vsm_profiles)
cols = min(3, n_projects)
rows = (n_projects + cols - 1) // cols

fig = plt.figure(figsize=(6 * cols, 6 * rows))

for i, (repo_name, profile) in enumerate(vsm_profiles.items()):
    ax = fig.add_subplot(rows, cols, i + 1, polar=True)
    plot_vsm_radar(profile, repo_name.split('/')[-1], ax)

plt.tight_layout()
plt.savefig('../docs/diagrams/vsm_radar_charts.png', dpi=150, bbox_inches='tight')
plt.show()

print("\n✅ Radar charts saved to docs/diagrams/vsm_radar_charts.png")

## 5. VSM Comparison Analysis

In [None]:
# Create comparison DataFrame
comparison_data = []

for repo_name, profile in vsm_profiles.items():
    row = {
        'repository': repo_name,
        'viability_score': profile['viability_score'],
        **profile['subsystem_scores']
    }
    
    # Add classification
    entropy_calc = EntropyCalculator()
    classification = entropy_calc.classify_project(projects[repo_name]['contributors'])
    row['classification'] = classification['classification']
    row['stadium_score'] = classification['stadium_score']
    
    comparison_data.append(row)

df_vsm = pd.DataFrame(comparison_data)

# Display sorted by viability
print("\nVSM Comparison (sorted by viability score):")
print("="*80)
display(df_vsm.sort_values('viability_score', ascending=False))

In [None]:
# Heatmap of VSM scores
if len(df_vsm) > 1:
    fig, ax = plt.subplots(figsize=(10, max(4, len(df_vsm) * 0.8)))
    
    # Prepare data for heatmap
    heatmap_data = df_vsm.set_index('repository')[['S1', 'S2', 'S3', 'S4', 'S5']]
    
    # Create heatmap
    sns.heatmap(heatmap_data, annot=True, fmt='.0f', cmap='RdYlGn',
                vmin=0, vmax=100, ax=ax, cbar_kws={'label': 'Score'})
    
    ax.set_xlabel('VSM Subsystem')
    ax.set_ylabel('Project')
    ax.set_title('VSM Subsystem Scores Comparison', fontsize=14, fontweight='bold')
    
    plt.tight_layout()
    plt.savefig('../docs/diagrams/vsm_heatmap.png', dpi=150, bbox_inches='tight')
    plt.show()
else:
    print("Need more projects for comparison heatmap")

## 6. VSM Weaknesses Analysis

In [None]:
def identify_vsm_weaknesses(vsm_profile: dict, threshold: float = 50) -> list:
    """
    Identify weak subsystems (below threshold).
    """
    weaknesses = []
    subsystem_names = {
        'S1': 'Operations (S1) - Primary productive activities',
        'S2': 'Coordination (S2) - Anti-oscillatory mechanisms',
        'S3': 'Control (S3) - Resource allocation',
        'S4': 'Intelligence (S4) - Environmental scanning',
        'S5': 'Policy (S5) - Governance and identity'
    }
    
    for subsystem, score in vsm_profile['subsystem_scores'].items():
        if score < threshold:
            weaknesses.append({
                'subsystem': subsystem,
                'name': subsystem_names[subsystem],
                'score': score,
                'gap': threshold - score
            })
    
    return sorted(weaknesses, key=lambda x: x['gap'], reverse=True)

# Analyze weaknesses for each project
print("\n" + "="*70)
print("VSM WEAKNESSES ANALYSIS (threshold: 50)")
print("="*70)

for repo_name, profile in vsm_profiles.items():
    weaknesses = identify_vsm_weaknesses(profile)
    
    print(f"\n{repo_name}:")
    if weaknesses:
        for w in weaknesses:
            print(f"  ⚠️  {w['name']}: {w['score']:.1f} (gap: {w['gap']:.1f})")
    else:
        print(f"  ✅ All subsystems above threshold")

## 7. VSM-Classification Correlation

In [None]:
# Analyze correlation between Stadium classification and VSM scores
if len(df_vsm) >= 3:
    print("\nVSM Score Statistics by Classification:")
    print("="*60)
    
    for col in ['viability_score', 'S1', 'S2', 'S3', 'S4', 'S5']:
        print(f"\n{col}:")
        stats = df_vsm.groupby('classification')[col].agg(['mean', 'std', 'count'])
        print(stats.round(2))
else:
    print("Need more projects for classification correlation analysis")

## 8. Export Results

In [None]:
# Save VSM analysis results
output_path = Path('../data/processed/vsm_analysis.csv')
output_path.parent.mkdir(parents=True, exist_ok=True)

df_vsm.to_csv(output_path, index=False)
print(f"✅ VSM analysis saved to {output_path}")

# Save detailed profiles as JSON
json_path = Path('../data/processed/vsm_profiles.json')
with open(json_path, 'w') as f:
    json.dump(vsm_profiles, f, indent=2, default=str)
print(f"✅ Detailed profiles saved to {json_path}")

## Key Findings

**Interpretation Guide:**
- **High S1, Low S2**: Active development but poor coordination → Risk of conflicts
- **High S3, Low S1**: Strong control but low activity → Potential stagnation
- **Low S5**: Missing governance → Identity crisis risk
- **Stadium + Low S5**: "Bus factor" risk - dependent on key maintainer

**Next Steps:**
1. Collect more projects to establish patterns
2. Correlate VSM scores with project outcomes
3. Develop intervention recommendations based on VSM gaps