diff --git a/.gitignore b/.gitignore index c9c1af1..cc7b84a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ __pycache__/ *.egg-info/ harmony-report.json + +# Generated visualizations from legacy mapper +semantic_map.html diff --git a/harmonizer/legacy_mapper.py b/harmonizer/legacy_mapper.py index 3c15829..d21a1df 100644 --- a/harmonizer/legacy_mapper.py +++ b/harmonizer/legacy_mapper.py @@ -9,6 +9,8 @@ import os import glob import subprocess +import json +from datetime import datetime from statistics import mean, stdev from collections import defaultdict from dataclasses import dataclass, field @@ -55,6 +57,68 @@ class RefactoringOpportunity: suggested_actions: List[str] = field(default_factory=list) +@dataclass +class GitCommitSnapshot: + """Semantic coordinates at a specific commit""" + + commit_hash: str + commit_date: datetime + author: str + coordinates: Tuple[float, float, float, float] # (L, J, P, W) + disharmony: float + + +@dataclass +class FunctionGenealogy: + """Evolution of a function over time""" + + function_name: str + file_path: str + snapshots: List[GitCommitSnapshot] = field(default_factory=list) + total_drift: float = 0.0 # Total semantic drift + drift_rate: float = 0.0 # Drift per commit + major_changes: List[Tuple[str, str, float]] = field(default_factory=list) # (hash, date, drift) + + +@dataclass +class SemanticDrift: + """Measure of semantic drift over time""" + + file_path: str + first_commit: str + last_commit: str + time_span_days: int + total_drift: float + drift_per_day: float + dimension_drifts: Dict[str, float] = field(default_factory=dict) # L, J, P, W individual drifts + stability_score: float = 1.0 # 1.0 = stable, 0.0 = highly volatile + + +@dataclass +class ArchitectureDoc: + """Documented architecture vs reality""" + + component_name: str + documented_purpose: str + documented_coordinates: Optional[Tuple[float, float, float, float]] + actual_coordinates: Tuple[float, float, float, float] + alignment_score: float # 0-1, how well docs match reality + discrepancies: List[str] = field(default_factory=list) + + +@dataclass +class ArchitecturalDebt: + """Estimated architectural debt""" + + file_path: str + debt_score: float # 0-1 + estimated_hours: float + estimated_cost_usd: float + debt_type: str # "High Disharmony", "God File", "Mixed Concerns", etc. + priority: str # CRITICAL, HIGH, MEDIUM, LOW + description: str + + class LegacyCodeMapper: """Advanced codebase semantic analysis""" @@ -64,6 +128,11 @@ def __init__(self, codebase_path: str, quiet: bool = False): self.file_analyses: Dict[str, FileAnalysis] = {} self.architectural_smells: List[ArchitecturalSmell] = [] self.refactoring_opportunities: List[RefactoringOpportunity] = [] + self.function_genealogies: Dict[str, FunctionGenealogy] = {} + self.semantic_drifts: List[SemanticDrift] = [] + self.architecture_docs: List[ArchitectureDoc] = [] + self.architectural_debts: List[ArchitecturalDebt] = [] + self.quiet = quiet def analyze_codebase(self, show_progress: bool = True) -> Dict: """Analyze entire codebase and generate comprehensive report""" @@ -357,6 +426,623 @@ def generate_complexity_heatmap(self) -> str: return "\n".join(heatmap) + def analyze_git_history(self, max_commits: int = 50, show_progress: bool = True) -> bool: + """Analyze git history to track semantic drift""" + if show_progress and not self.quiet: + print(f"\nπ Analyzing git history (last {max_commits} commits)...") + + # Check if we're in a git repo + try: + subprocess.run( + ["git", "rev-parse", "--git-dir"], + cwd=self.codebase_path, + capture_output=True, + check=True, + ) + except (subprocess.CalledProcessError, FileNotFoundError): + if show_progress and not self.quiet: + print("β οΈ Not a git repository - skipping history analysis") + return False + + # Get commit history + try: + result = subprocess.run( + ["git", "log", f"-{max_commits}", "--pretty=format:%H|%ai|%an"], + cwd=self.codebase_path, + capture_output=True, + text=True, + check=True, + ) + commits = [line.split("|") for line in result.stdout.strip().split("\n") if line] + except subprocess.CalledProcessError: + if show_progress and not self.quiet: + print("β οΈ Failed to get git history") + return False + + if not commits: + return False + + # Analyze each file's evolution + for file_path, current_analysis in self.file_analyses.items(): + rel_path = os.path.relpath(file_path, self.codebase_path) + drift = self._analyze_file_history(rel_path, commits, current_analysis) + if drift: + self.semantic_drifts.append(drift) + + if show_progress and not self.quiet: + print(f"β Analyzed {len(self.semantic_drifts)} files with git history") + + return True + + def _analyze_file_history( + self, rel_file_path: str, commits: List[List[str]], current_analysis: FileAnalysis + ) -> Optional[SemanticDrift]: + """Analyze how a single file evolved over time""" + snapshots = [] + + for commit_hash, commit_date_str, author in commits[:10]: # Sample 10 commits + # Get file content at this commit + try: + result = subprocess.run( + ["git", "show", f"{commit_hash}:{rel_file_path}"], + cwd=self.codebase_path, + capture_output=True, + text=True, + timeout=5, + ) + + if result.returncode != 0: + continue # File didn't exist at this commit + + # Write to temp file and analyze + import tempfile + with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: + f.write(result.stdout) + temp_path = f.name + + try: + # Analyze this version + results = self.harmonizer.analyze_file(temp_path) + if results: + # Compute average coordinates + all_coords = [] + all_disharmony = [] + + for func_name, data in results.items(): + ice_result = data.get("ice_result", {}) + ice_components = ice_result.get("ice_components", {}) + execution_result = ice_components.get("execution") + + if execution_result: + coords = execution_result.coordinates + all_coords.append((coords.love, coords.justice, coords.power, coords.wisdom)) + + all_disharmony.append(data.get("score", 0)) + + if all_coords: + avg_l = mean([c[0] for c in all_coords]) + avg_j = mean([c[1] for c in all_coords]) + avg_p = mean([c[2] for c in all_coords]) + avg_w = mean([c[3] for c in all_coords]) + + snapshots.append(GitCommitSnapshot( + commit_hash=commit_hash[:8], + commit_date=datetime.fromisoformat(commit_date_str.replace(' ', 'T')), + author=author, + coordinates=(avg_l, avg_j, avg_p, avg_w), + disharmony=mean(all_disharmony) if all_disharmony else 0.0 + )) + finally: + os.unlink(temp_path) + + except Exception: + continue + + if len(snapshots) < 2: + return None + + # Calculate drift + first = snapshots[-1] # Oldest + last = snapshots[0] # Newest + + # Euclidean distance in LJPW space + drift_l = last.coordinates[0] - first.coordinates[0] + drift_j = last.coordinates[1] - first.coordinates[1] + drift_p = last.coordinates[2] - first.coordinates[2] + drift_w = last.coordinates[3] - first.coordinates[3] + + total_drift = (drift_l**2 + drift_j**2 + drift_p**2 + drift_w**2) ** 0.5 + + # Time span + time_span = (last.commit_date - first.commit_date).days + drift_per_day = total_drift / max(time_span, 1) + + # Stability score (inverse of drift) + stability = max(0.0, 1.0 - total_drift) + + return SemanticDrift( + file_path=rel_file_path, + first_commit=first.commit_hash, + last_commit=last.commit_hash, + time_span_days=time_span, + total_drift=total_drift, + drift_per_day=drift_per_day, + dimension_drifts={"L": drift_l, "J": drift_j, "P": drift_p, "W": drift_w}, + stability_score=stability + ) + + def analyze_architecture_docs(self, docs_path: Optional[str] = None) -> bool: + """Compare documented architecture with actual implementation""" + if not docs_path: + # Look for common doc files + doc_files = [] + for pattern in ["ARCHITECTURE.md", "docs/ARCHITECTURE.md", "README.md", "docs/README.md"]: + path = os.path.join(self.codebase_path, pattern) + if os.path.exists(path): + doc_files.append(path) + + if not doc_files: + if not self.quiet: + print("β οΈ No architecture documentation found") + return False + + docs_path = doc_files[0] + + if not self.quiet: + print(f"\nπ Analyzing architecture documentation: {os.path.basename(docs_path)}") + + # Read documentation + try: + with open(docs_path, 'r') as f: + doc_content = f.read().lower() + except Exception as e: + if not self.quiet: + print(f"β οΈ Could not read documentation: {e}") + return False + + # Extract component mentions and their documented purposes + # Look for patterns like "X handles Y" or "X is responsible for Y" + import re + + for file_path, analysis in self.file_analyses.items(): + filename = os.path.basename(file_path).replace('.py', '') + + # Check if this component is documented + if filename.lower() not in doc_content: + continue + + # Try to extract documented purpose + patterns = [ + rf'{filename}\s+(?:handles|manages|provides|implements|is responsible for)\s+([^.]+)', + rf'`{filename}`[:\s]+([^.]+)', + ] + + documented_purpose = None + for pattern in patterns: + match = re.search(pattern, doc_content, re.IGNORECASE) + if match: + documented_purpose = match.group(1).strip() + break + + if not documented_purpose: + documented_purpose = "Mentioned but purpose unclear" + + # Infer documented coordinates from purpose text + doc_coords = self._infer_coordinates_from_text(documented_purpose) + + # Compare with actual + actual = analysis.coordinates + + if doc_coords: + # Calculate alignment (inverse of distance) + distance = sum((doc_coords[i] - actual[i])**2 for i in range(4)) ** 0.5 + alignment = max(0.0, 1.0 - distance) + + discrepancies = [] + if abs(doc_coords[0] - actual[0]) > 0.3: + discrepancies.append(f"Love dimension mismatch: doc={doc_coords[0]:.2f} vs actual={actual[0]:.2f}") + if abs(doc_coords[1] - actual[1]) > 0.3: + discrepancies.append(f"Justice dimension mismatch: doc={doc_coords[1]:.2f} vs actual={actual[1]:.2f}") + if abs(doc_coords[2] - actual[2]) > 0.3: + discrepancies.append(f"Power dimension mismatch: doc={doc_coords[2]:.2f} vs actual={actual[2]:.2f}") + if abs(doc_coords[3] - actual[3]) > 0.3: + discrepancies.append(f"Wisdom dimension mismatch: doc={doc_coords[3]:.2f} vs actual={actual[3]:.2f}") + else: + alignment = 0.5 # Unknown + discrepancies = ["Could not infer semantic coordinates from documentation"] + + self.architecture_docs.append(ArchitectureDoc( + component_name=filename, + documented_purpose=documented_purpose, + documented_coordinates=doc_coords, + actual_coordinates=actual, + alignment_score=alignment, + discrepancies=discrepancies + )) + + if not self.quiet: + print(f"β Compared {len(self.architecture_docs)} documented components with reality") + + return True + + def _infer_coordinates_from_text(self, text: str) -> Optional[Tuple[float, float, float, float]]: + """Infer LJPW coordinates from natural language description""" + text_lower = text.lower() + + # Keywords for each dimension + love_keywords = ['connect', 'integrate', 'communicate', 'coordinate', 'collaborate', 'interface'] + justice_keywords = ['validate', 'verify', 'check', 'ensure', 'enforce', 'correct'] + power_keywords = ['create', 'delete', 'modify', 'update', 'execute', 'control', 'manage'] + wisdom_keywords = ['analyze', 'compute', 'calculate', 'process', 'retrieve', 'query', 'understand'] + + l = sum(1 for kw in love_keywords if kw in text_lower) + j = sum(1 for kw in justice_keywords if kw in text_lower) + p = sum(1 for kw in power_keywords if kw in text_lower) + w = sum(1 for kw in wisdom_keywords if kw in text_lower) + + total = l + j + p + w + if total == 0: + return None + + # Normalize + return (l/total, j/total, p/total, w/total) + + def estimate_architectural_debt(self, hourly_rate: float = 150.0): + """Estimate architectural debt in hours and dollars""" + if not self.quiet: + print(f"\nπ° Estimating architectural debt (rate: ${hourly_rate}/hr)...") + + for file_path, analysis in self.file_analyses.items(): + rel_path = os.path.relpath(file_path, self.codebase_path) + + # Calculate debt score (0-1) + debt_factors = [] + + # Factor 1: Disharmony + if analysis.avg_disharmony > 0.5: + debt_factors.append(analysis.avg_disharmony) + + # Factor 2: Complexity (function count) + if analysis.function_count > 20: + debt_factors.append(min(1.0, analysis.function_count / 50)) + + # Factor 3: Semantic confusion + if analysis.dimension_spread < 0.2: + debt_factors.append(0.6) + + if not debt_factors: + continue + + debt_score = mean(debt_factors) + + # Estimate hours based on debt factors + base_hours = 0 + debt_type = [] + + if analysis.avg_disharmony > 0.7: + base_hours += analysis.function_count * 0.5 # 30 min per function to fix + debt_type.append("High Disharmony") + + if analysis.function_count > 30: + base_hours += analysis.function_count * 0.3 # Refactoring time + debt_type.append("God File") + + if analysis.dimension_spread < 0.2: + base_hours += 4 # Clarification and restructuring + debt_type.append("Semantic Confusion") + + if base_hours == 0: + continue + + # Priority based on impact + if debt_score > 0.8: + priority = "CRITICAL" + elif debt_score > 0.6: + priority = "HIGH" + elif debt_score > 0.4: + priority = "MEDIUM" + else: + priority = "LOW" + + self.architectural_debts.append(ArchitecturalDebt( + file_path=rel_path, + debt_score=debt_score, + estimated_hours=base_hours, + estimated_cost_usd=base_hours * hourly_rate, + debt_type=" + ".join(debt_type), + priority=priority, + description=f"{analysis.function_count} functions, {analysis.avg_disharmony:.2f} avg disharmony" + )) + + if not self.quiet: + total_hours = sum(d.estimated_hours for d in self.architectural_debts) + total_cost = sum(d.estimated_cost_usd for d in self.architectural_debts) + print(f"β Total debt: {total_hours:.1f} hours (${total_cost:,.0f})") + + def generate_3d_visualization_data(self) -> Dict: + """Generate data for 3D visualization of codebase in LJPW space""" + data = { + "files": [], + "clusters": {}, + "dimensions": ["Love", "Justice", "Power", "Wisdom"] + } + + for file_path, analysis in self.file_analyses.items(): + l, j, p, w = analysis.coordinates + rel_path = os.path.relpath(file_path, self.codebase_path) + + file_data = { + "path": rel_path, + "coordinates": {"L": l, "J": j, "P": p, "W": w}, + "dominant": analysis.dominant_dimension, + "disharmony": analysis.avg_disharmony, + "function_count": analysis.function_count, + "color": self._get_dimension_color(analysis.dominant_dimension) + } + data["files"].append(file_data) + + # Add to cluster + if analysis.dominant_dimension not in data["clusters"]: + data["clusters"][analysis.dominant_dimension] = [] + data["clusters"][analysis.dominant_dimension].append(file_data) + + return data + + def _get_dimension_color(self, dimension: str) -> str: + """Get color code for dimension""" + colors = { + "Love": "#FFD700", # Gold + "Justice": "#4169E1", # Royal Blue + "Power": "#DC143C", # Crimson + "Wisdom": "#32CD32" # Lime Green + } + return colors.get(dimension, "#808080") + + def generate_semantic_map_ascii(self) -> str: + """Generate advanced ASCII semantic map showing codebase structure""" + if not self.file_analyses: + return "No files analyzed" + + output = [] + output.append("\n" + "=" * 90) + output.append("3D SEMANTIC SPACE MAP (LJPW Coordinates)") + output.append("=" * 90) + + # Create 2D projection: Love-Justice (X) vs Power-Wisdom (Y) + output.append("\n Power-Wisdom Axis (β)") + output.append(" 1.0 β€") + + # Create grid + grid_size = 20 + grid = [[' ' for _ in range(grid_size)] for _ in range(grid_size)] + file_map = {} + + for file_path, analysis in self.file_analyses.items(): + l, j, p, w = analysis.coordinates + + # Project to 2D: X = (L + J) / 2, Y = (P + W) / 2 + x_val = (l + j) / 2.0 + y_val = (p + w) / 2.0 + + # Map to grid coordinates + x = int(x_val * (grid_size - 1)) + y = int(y_val * (grid_size - 1)) + + # Ensure within bounds + x = max(0, min(grid_size - 1, x)) + y = max(0, min(grid_size - 1, y)) + + # Symbol based on dominant dimension + symbol = { + "Love": "β₯", + "Justice": "β", + "Power": "β‘", + "Wisdom": "β" + }.get(analysis.dominant_dimension, "β") + + if grid[grid_size - 1 - y][x] == ' ': + grid[grid_size - 1 - y][x] = symbol + file_map[(y, x)] = os.path.basename(file_path) + else: + grid[grid_size - 1 - y][x] = "βͺ" # Multiple files + + # Print grid + for i, row in enumerate(grid): + y_label = f"{1.0 - (i / grid_size):.1f}" + if i % 5 == 0: + output.append(f" {y_label:>4} β€ {''.join(row)}") + else: + output.append(f" β {''.join(row)}") + + output.append(f" 0.0 β{'β' * grid_size}") + output.append(f" 0.0{' ' * (grid_size - 8)}1.0") + output.append(" Love-Justice Axis (β)") + + output.append("\nLEGEND:") + output.append(" β₯ Love-dominant β Justice-dominant") + output.append(" β‘ Power-dominant β Wisdom-dominant") + output.append(" βͺ Multiple files at same location") + + return "\n".join(output) + + def generate_drift_timeline(self) -> str: + """Generate timeline visualization of semantic drift""" + if not self.semantic_drifts: + return "No drift data available" + + output = [] + output.append("\n" + "=" * 90) + output.append("SEMANTIC DRIFT TIMELINE") + output.append("=" * 90) + + # Sort by drift amount + sorted_drifts = sorted(self.semantic_drifts, key=lambda x: x.total_drift, reverse=True)[:10] + + for drift in sorted_drifts: + output.append(f"\n{drift.file_path}") + + # Create drift bar + drift_normalized = min(1.0, drift.total_drift / 2.0) # Cap at 2.0 for visualization + bar_length = int(drift_normalized * 40) + bar = "β" * bar_length + "β" * (40 - bar_length) + + stability_icon = "β" if drift.stability_score > 0.7 else ("β " if drift.stability_score > 0.3 else "β β ") + + output.append(f" Drift: {bar} {drift.total_drift:.3f} {stability_icon}") + output.append(f" Time: {drift.time_span_days} days | Commits: {drift.first_commit}..{drift.last_commit}") + + # Show dimension-specific drift + dim_bars = [] + for dim, delta in drift.dimension_drifts.items(): + if abs(delta) > 0.1: + sign = "+" if delta > 0 else "" + dim_bars.append(f"{dim}{sign}{delta:.2f}") + + if dim_bars: + output.append(f" Changes: {' | '.join(dim_bars)}") + + return "\n".join(output) + + def generate_debt_breakdown(self) -> str: + """Generate detailed debt breakdown visualization""" + if not self.architectural_debts: + return "No debt data available" + + output = [] + output.append("\n" + "=" * 90) + output.append("ARCHITECTURAL DEBT BREAKDOWN") + output.append("=" * 90) + + total_hours = sum(d.estimated_hours for d in self.architectural_debts) + total_cost = sum(d.estimated_cost_usd for d in self.architectural_debts) + + output.append(f"\nTotal Debt: {total_hours:.1f} hours | ${total_cost:,.0f}") + + # Debt by type + by_type = defaultdict(lambda: {"hours": 0, "cost": 0, "count": 0}) + for debt in self.architectural_debts: + by_type[debt.debt_type]["hours"] += debt.estimated_hours + by_type[debt.debt_type]["cost"] += debt.estimated_cost_usd + by_type[debt.debt_type]["count"] += 1 + + output.append("\nBy Debt Type:") + for debt_type, stats in sorted(by_type.items(), key=lambda x: x[1]["cost"], reverse=True): + percentage = (stats["cost"] / total_cost * 100) if total_cost > 0 else 0 + bar_length = int(percentage / 100 * 40) + bar = "β" * bar_length + "β" * (40 - bar_length) + + output.append(f"\n {debt_type}") + output.append(f" {bar} {percentage:.1f}%") + output.append(f" {stats['count']} files | {stats['hours']:.1f}hrs | ${stats['cost']:,.0f}") + + # Top debt contributors + output.append("\n\nTop 10 Debt Contributors:") + sorted_debts = sorted(self.architectural_debts, key=lambda x: x.estimated_cost_usd, reverse=True)[:10] + + for i, debt in enumerate(sorted_debts, 1): + cost_bar_length = int((debt.estimated_cost_usd / total_cost) * 50) + cost_bar = "β" * cost_bar_length + + output.append(f"\n {i}. {debt.file_path}") + output.append(f" {cost_bar} ${debt.estimated_cost_usd:,.0f}") + output.append(f" {debt.priority} | {debt.debt_type} | {debt.estimated_hours:.1f}hrs") + + return "\n".join(output) + + def export_visualization_html(self, output_path: str = "semantic_map.html"): + """Export interactive HTML visualization""" + viz_data = self.generate_3d_visualization_data() + + html_template = """ + +
+Interactive visualization of codebase in LJPW semantic space
+ +... and {len(files) - 5} more files
" + + clusters_html += "