In [1]:
#!/usr/bin/env python3
import os
import sys
import csv
import json
import shutil
import argparse
import subprocess
from pathlib import Path

!pip install GitPython
!pip install pylint
!pip install radon



In [2]:
# ----------------------------
# Violation Class Definition
# ----------------------------
class Violation:
    def __init__(self, file, line, category, rule, description):
        self.file = file
        self.line = line
        self.category = category  # e.g., 'Pylint' or 'Cyclomatic Complexity'
        self.rule = rule          # for pylint: message id/symbol; for radon: 'Complexity'
        self.description = description

In [3]:
from git import Repo
from radon.complexity import cc_visit, cc_rank
# ----------------------------
# Analysis: Pylint for Python Files
# ----------------------------
def analyze_python_file_pylint(filepath):
    """
    Runs pylint on a single Python file and returns a list of Violations.
    Each violation includes details such as message id, symbol, type, and message.
    """
    command = [sys.executable, "-m", "pylint", "--output-format=json", filepath]
    print(f"Running pylint on {filepath}...")
    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    violations = []
    try:
        issues = json.loads(result.stdout)
        for issue in issues:
            # Use keys like 'path', 'line', 'symbol', and 'message'
            violations.append(Violation(
                file=issue.get("path", filepath),
                line=str(issue.get("line", "")),
                category="Pylint",
                rule=issue.get("symbol", ""),
                description=issue.get("message", "")
            ))
    except Exception as e:
        print(f"Error processing pylint output for {filepath}: {e}")
    return violations

# ----------------------------
# Analysis: Radon (Cyclomatic Complexity)
# ----------------------------
def analyze_python_file_radon(filepath):
    """
    Uses radon to analyze the cyclomatic complexity of a Python file.
    For every function/method in the file, it creates a violation record.
    """
    violations = []
    try:
        with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
            source = f.read()
        blocks = cc_visit(source)
        for block in blocks:
            # Create a violation entry for each block.
            # You may choose to report only blocks with complexity above a certain threshold.
            desc = (f"Function '{block.name}' has cyclomatic complexity {block.complexity} "
                    f"(Rank: {cc_rank(block.complexity)})")
            violations.append(Violation(
                file=filepath,
                line=str(block.lineno),
                category="Cyclomatic Complexity",
                rule="Complexity",
                description=desc
            ))
    except Exception as e:
        print(f"Error processing radon analysis for {filepath}: {e}")
    return violations

# ----------------------------
# Analyze All Python Files in the Repository
# ----------------------------
def analyze_python_repo(repo_path):
    """
    Walks through the repo, running pylint and radon analysis on each Python file.
    Returns a combined list of all detected violations.
    """
    all_violations = []
    for root, _, files in os.walk(repo_path):
        for file in files:
            if file.endswith('.py'):
                file_path = os.path.join(root, file)
                # Pylint analysis
                all_violations.extend(analyze_python_file_pylint(file_path))
                # Radon analysis for cyclomatic complexity
                all_violations.extend(analyze_python_file_radon(file_path))
    print(f"Total issues found in Python files: {len(all_violations)}")
    return all_violations


In [4]:
# ----------------------------
# Report Generators
# ----------------------------
def generate_csv_report(violations, output_path):
    """
    Generates a CSV report with columns: File, Line, Category, Rule, Description.
    """
    with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["File", "Line", "Category", "Rule", "Description"])
        for v in violations:
            writer.writerow([v.file, v.line, v.category, v.rule, v.description])
    print("CSV report generated at:", output_path)

def generate_html_report(violations, output_path):
    """
    Generates an HTML report including a pie chart (Google Charts) summarizing issue distribution.
    """
    # Count issues by category and rule.
    counts = {}
    for v in violations:
        key = f"{v.category}: {v.rule}"
        counts[key] = counts.get(key, 0) + 1

    html = []
    html.append("<html><head>")
    html.append("<script src='https://www.gstatic.com/charts/loader.js'></script>")
    html.append("</head><body>")
    html.append("<h2>Python Code Analysis Report</h2>")
    html.append("<div id='chart' style='width: 800px; height: 600px;'></div>")
    html.append("<script>")
    html.append("google.charts.load('current', {packages:['corechart']});")
    html.append("google.charts.setOnLoadCallback(drawChart);")
    html.append("function drawChart() {")
    html.append("var data = new google.visualization.DataTable();")
    html.append("data.addColumn('string', 'Issue');")
    html.append("data.addColumn('number', 'Count');")
    html.append("data.addRows([")
    for key, count in counts.items():
        html.append(f"['{key}', {count}],")
    html.append("]);")
    html.append("var options = {title: 'Issue Distribution', pieHole: 0.4};")
    html.append("var chart = new google.visualization.PieChart(document.getElementById('chart'));")
    html.append("chart.draw(data, options);")
    html.append("}")
    html.append("</script>")
    html.append("<h3>Detailed Results</h3>")
    html.append("<table border='1' cellpadding='5' cellspacing='0'>")
    html.append("<tr><th>File</th><th>Line</th><th>Category</th><th>Rule</th><th>Description</th></tr>")
    for v in violations:
        html.append(f"<tr><td>{v.file}</td><td>{v.line}</td><td>{v.category}</td><td>{v.rule}</td><td>{v.description}</td></tr>")
    html.append("</table>")
    html.append("</body></html>")
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write("\n".join(html))
    print("HTML report generated at:", output_path)


In [6]:
import stat

def on_rm_error(func, path, exc_info):
    # Change the file's permissions and try to remove it again
    os.chmod(path, stat.S_IWRITE)
    func(path)

In [16]:
# ----------------------------
# Git Repository Cloning
# ----------------------------
def clone_repository(repo_url, local_path):
    """
    Clones the provided git repository URL into the specified local directory.
    If the directory exists, it is removed first.
    """
    if os.path.exists(local_path):
        shutil.rmtree(local_path, onerror=on_rm_error)
    print(f"Cloning repository from {repo_url} into {local_path}...")
    Repo.clone_from(repo_url, local_path)
    print("Repository cloned.")

In [11]:
# --------------------------------------
# Main Function for command line inputs
# --------------------------------------
# def main():
#     parser = argparse.ArgumentParser(
#         description="Static Analyzer for Python files with elaborate analysis (Pylint & Cyclomatic Complexity)."
#     )
#     parser.add_argument("-r", "--repo", required=True, help="Git repository URL containing Python files")
#     parser.add_argument("-o", "--output", default="results.csv", help="CSV output path")
#     parser.add_argument("-html", "--html_report", default="report.html", help="HTML report output path")
#     args = parser.parse_args()

#     local_path = "temp_repo"
#     clone_repository(args.repo, local_path)

#     # Run analysis on Python files.
#     violations = analyze_python_repo(local_path)

#     # Generate reports.
#     generate_csv_report(violations, args.output)
#     generate_html_report(violations, args.html_report)



In [14]:
# ----------------------------
# Interactive main for notebooks / interactive environments
# ----------------------------
def main():
    repo_url = input("Enter the Git repository URL: ").strip()
    output_csv = input("Enter CSV report file name (default: results.csv): ").strip() or "results.csv"
    html_report = input("Enter HTML report file name (default: report.html): ").strip() or "report.html"
    
    local_path = "temp_repo"
    clone_repository(repo_url, local_path)
    
    python_violations = analyze_python_repo(local_path)
    print(f"Total violations found: {len(python_violations)}")

    generate_csv_report(python_violations, output_csv)
    generate_html_report(python_violations, html_report)

    # Clean up the cloned repository.
    shutil.rmtree(local_path, onerror=on_rm_error)
    print("Temporary repository directory removed.")

In [17]:
# ----------------------------
# Entry Point
# ----------------------------
if __name__ == "__main__":
    main()

Enter the Git repository URL:  https://github.com/Irfan7587/streamlit
Enter CSV report file name (default: results.csv):  
Enter HTML report file name (default: report.html):  


Cloning repository from https://github.com/Irfan7587/streamlit into temp_repo...
Repository cloned.
Running pylint on temp_repo\app.py...
Running pylint on temp_repo\scoring_latest.py...
Total issues found in Python files: 64
Total violations found: 64
CSV report generated at: results.csv
HTML report generated at: report.html
Temporary repository directory removed.
