In [2]:
import os, sys, tempfile, shutil, stat, subprocess, gc
from pathlib import Path
import joblib
import pandas as pd
import re
import git
from collections import defaultdict

# Ensure prograde_core is importable (it contains analyze_repository)
try:
    import prograde_core
    analyze_repository = prograde_core.analyze_repository
except Exception as e:
    print("Error: could not import prograde_core.analyze_repository:", e)
    raise

# Paths (adjust if files are elsewhere)
PROGRADE_MODEL_PATH = "prograde_model.joblib"
ARTIFACTS = {
    "stack_model": "tech_stack_classifier.joblib",
    "mlb": "mlb.joblib",
    "domain_model": "tech_domain_classifier.joblib",
    "domain_labels": "domain_labels.joblib",
    "quality_model": "quality_model.joblib",
    "schema_csv": "scored_dataset.csv"
}

# --- Utilities ---
def safe_load(path):
    try:
        return joblib.load(path)
    except Exception as e:
        return None

def git_commit_count(repo_path):
    try:
        out = subprocess.run(
            ["git", "rev-list", "--count", "HEAD"],
            cwd=repo_path, capture_output=True, text=True, check=True
        )
        return int(out.stdout.strip())
    except Exception:
        return 0

def readme_word_count(repo_path):
    try:
        for f in os.listdir(repo_path):
            if f.lower().startswith("readme"):
                with open(os.path.join(repo_path, f), "r", encoding="utf-8", errors="ignore") as fh:
                    return len(re.findall(r"\b\w+\b", fh.read()))
    except Exception:
        pass
    return 0

def extract_basic_ml_features(repo_path):
    """
    Build a simple ML feature dict similar to what the training script expects:
    counts of common extensions, readme words, commit counts, directory depth, presence flags.
    (Matches roughly the features you used earlier.)
    """
    features = {
        'num_commits': 0, 'readme_word_count': 0, 'directory_depth': 0,
        'has_test_folder': 0, 'has_eslint': 0, 'has_dockerfile': 0,
        'has_license': 0, 'has_gitignore': 0, 'has_package_json': 0,
        'has_pom_xml': 0, 'has_requirements_txt': 0,
        'count_py': 0, 'count_js': 0, 'count_md': 0, 'count_json': 0,
        'count_html': 0, 'count_css': 0, 'count_java': 0, 'count_ts': 0,
        'count_go': 0, 'count_rb': 0, 'count_php': 0,
    }
    features['num_commits'] = git_commit_count(repo_path)
    features['readme_word_count'] = readme_word_count(repo_path)
    max_depth = 0
    test_folders = {'test', 'tests', 'spec', 'specs', '__tests__'}
    for root, dirs, files in os.walk(repo_path):
        # avoid .git internals
        if ".git" in root.split(os.sep):
            continue
        depth = root.replace(repo_path, "").count(os.sep)
        if depth > max_depth: max_depth = depth
        for d in dirs:
            if d.lower() in test_folders:
                features['has_test_folder'] = 1
        for fname in files:
            lower = fname.lower()
            if lower == 'dockerfile': features['has_dockerfile'] = 1
            if lower.startswith('.eslintrc'): features['has_eslint'] = 1
            if lower.startswith('license'): features['has_license'] = 1
            if lower == '.gitignore': features['has_gitignore'] = 1
            if lower == 'package.json': features['has_package_json'] = 1
            if lower == 'pom.xml': features['has_pom_xml'] = 1
            if lower == 'requirements.txt': features['has_requirements_txt'] = 1
            _, ext = os.path.splitext(lower)
            if ext == '.py': features['count_py'] += 1
            elif ext == '.js': features['count_js'] += 1
            elif ext in ('.md', '.markdown'): features['count_md'] += 1
            elif ext == '.json': features['count_json'] += 1
            elif ext == '.html': features['count_html'] += 1
            elif ext == '.css': features['count_css'] += 1
            elif ext == '.java': features['count_java'] += 1
            elif ext == '.ts': features['count_ts'] += 1
            elif ext == '.go': features['count_go'] += 1
            elif ext == '.rb': features['count_rb'] += 1
            elif ext == '.php': features['count_php'] += 1
    features['directory_depth'] = max_depth
    return features

def align_features_to_schema(feature_dict, schema_csv_path):
    """Read header of schema CSV and align feature dict into a dataframe row (missing -> 0)."""
    if not os.path.exists(schema_csv_path):
        # no schema: just return df with features present
        return pd.DataFrame([feature_dict])
    header = pd.read_csv(schema_csv_path, nrows=0).columns.tolist()
    # training features = header minus NON_FEATURE_COLS found earlier in user's code
    NON_FEATURE_COLS = ['repo_url','repo_name','tech_stack','tech_domain','quality_score']
    training_features = [c for c in header if c not in NON_FEATURE_COLS]
    df = pd.DataFrame([feature_dict])
    df_aligned = df.reindex(columns=training_features, fill_value=0)
    return df_aligned

# --- Load artifacts if present ---
print("Loading artifacts (if available)...")
models = {}
for k, p in ARTIFACTS.items():
    if os.path.exists(p):
        try:
            models[k] = joblib.load(p)
            print(f"  - loaded {p}")
        except Exception as e:
            print(f"  - failed to load {p}: {e}")
    else:
        print(f"  - not found: {p}")

# load prograde_model.joblib if present (may be saved ProGrade instance)
prograde_model_obj = None
if os.path.exists(PROGRADE_MODEL_PATH):
    try:
        prograde_model_obj = joblib.load(PROGRADE_MODEL_PATH)
        print(f"Loaded prograde_model from {PROGRADE_MODEL_PATH} (type: {type(prograde_model_obj)})")
    except Exception as e:
        print(f"Could not load {PROGRADE_MODEL_PATH}: {e}")
else:
    print(f"prograde_model.joblib not found at {PROGRADE_MODEL_PATH}")

# --- Get repo input from user in notebook ---
repo_input = input("Enter a GitHub repo URL (https.. or git@.. or local path):\n> ").strip()
if not repo_input:
    raise SystemExit("No input provided.")

# prepare temp clone if URL
is_url = repo_input.startswith("http://") or repo_input.startswith("https://") or repo_input.startswith("git@")
tempdir = None
repo_path = repo_input
try:
    if is_url:
        tempdir = tempfile.mkdtemp(prefix="prograde_nb_clone_")
        print("Cloning (shallow) to temporary folder:", tempdir)
        try:
            repo_obj = git.Repo.clone_from(repo_input, tempdir, depth=1)
            # try to unshallow quickly: if fails we'll still have code for heuristic analysis
            shallow_file = os.path.join(tempdir, ".git", "shallow")
            if os.path.exists(shallow_file):
                try:
                    repo_obj.git.fetch("--unshallow")
                except Exception:
                    # ignore - we still have a working checkout
                    pass
        except Exception as e:
            # try full clone fallback
            print("Shallow clone failed, trying full clone:", e)
            repo_obj = git.Repo.clone_from(repo_input, tempdir)
        repo_path = tempdir
    else:
        # local path
        if not os.path.exists(repo_input):
            raise FileNotFoundError(f"Local path does not exist: {repo_input}")
        repo_path = os.path.abspath(repo_input)

    # 1) Heuristic analysis (repo name, tech stack, contributors, heuristic scores)
    print("\nRunning heuristic analyze_repository(...) (ProGrade core)...")
    try:
        heuristic_report = analyze_repository(repo_path)
    except Exception as e:
        print("Heuristic analyze_repository raised:", e)
        heuristic_report = {}

    # 2) If prograde_model object has analyze(repo_path), use it (this may be preferred)
    model_report = None
    if prograde_model_obj is not None and hasattr(prograde_model_obj, "analyze"):
        try:
            print("\nCalling model.analyze(...) from prograde_model.joblib...")
            model_report = prograde_model_obj.analyze(repo_path)
            print("Model analyze returned successfully.")
        except Exception as e:
            print("model.analyze failed:", e)
            model_report = None

    # 3) If separate ML artifacts present, perform ML predictions
    ml_predictions = {}
    if all(k in models for k in ("quality_model", "domain_model", "domain_labels")) and os.path.exists(ARTIFACTS["schema_csv"]):
        print("\nRunning ML predictions (quality + domain) using separate artifacts...")
        feat_dict = extract_basic_ml_features(repo_path)
        df_aligned = align_features_to_schema(feat_dict, ARTIFACTS["schema_csv"])
        try:
            quality = models["quality_model"].predict(df_aligned)[0]
            domain_enc = models["domain_model"].predict(df_aligned)
            # domain_labels may be LabelEncoder saved
            domain = models["domain_labels"].inverse_transform(domain_enc)[0] if hasattr(models["domain_labels"], "inverse_transform") else str(domain_enc)
            ml_predictions = {"quality_score": float(quality), "tech_domain_pred": domain}
            print("ML predictions ready.")
        except Exception as e:
            print("ML prediction failed:", e)
            ml_predictions = {}

    # --- Build final combined report ---
    print("\n" + "="*50)
    print(f"ðŸš€ Analysis Report for: {repo_input}")
    print("="*50)

    # Repo name (choose best available)
    repo_name = heuristic_report.get("repo_name") or (model_report.get("repo_name") if model_report else None) \
                or os.path.basename(os.path.normpath(repo_path))
    print(f"\nRepo Name: {repo_name}")

    # Tech Stack detected (heuristic preferred)
    tech_stack = heuristic_report.get("tech_stack") or (model_report.get("tech_stack") if model_report else None)
    print_tech = False
    if tech_stack:
        print("\nTech Stack (Detected):")
        order = ["languages","frameworks","databases","other_tools","ai_coding_assistants","apis_and_services"]
        for cat in order:
            items = tech_stack.get(cat) or tech_stack.get(cat.replace("other_tools","apis_and_services")) or []
            if items:
                print(f"  --- {cat.replace('_',' ').title()} ---")
                for t in sorted(items):
                    print(f"    - {t}")
                print_tech = True
    if not tech_stack or not print_tech:
        print("\nTech Stack (Detected):  - (none detected)")

    # Domain (prefer model_report, else ML separate)
    domain = None
    if model_report and model_report.get("domains"):
        domain = ", ".join(model_report.get("domains"))
    elif ml_predictions.get("tech_domain_pred"):
        domain = ml_predictions["tech_domain_pred"]
    else:
        # heuristic can provide domains too
        hdoms = heuristic_report.get("domains")
        if hdoms:
            domain = ", ".join(hdoms)
    print(f"\nDomain (Detected): {domain or '(unknown)'}")

    # Scores: prefer model_report 'scores' (all categories), fallback to heuristic scores or ml quality
    scores = None
    if model_report and model_report.get("scores"):
        scores = model_report["scores"]
    elif heuristic_report and heuristic_report.get("scores"):
        scores = heuristic_report["scores"]
    elif ml_predictions.get("quality_score") is not None:
        scores = {"predicted_quality_score": ml_predictions["quality_score"]}
    if scores:
        print("\nScores:")
        # if scores is dict print keys
        if isinstance(scores, dict):
            for k, v in scores.items():
                # nice formatting
                if isinstance(v, float):
                    print(f"  - {k.replace('_',' ').title()}: {v:.2f}")
                else:
                    print(f"  - {k.replace('_',' ').title()}: {v}")
        else:
            print("  -", scores)
    else:
        print("\nScores: (none)")

    # Contributors: prefer model_report then heuristic
    contributors = None
    if model_report and model_report.get("contributors"):
        contributors = model_report["contributors"]
    elif heuristic_report and heuristic_report.get("contributors"):
        contributors = heuristic_report["contributors"]
    if contributors:
        print("\nTop Contributors:")
        # contributors may be list of dicts or strings
        if isinstance(contributors, list) and contributors and isinstance(contributors[0], dict):
            toshow = contributors if len(contributors)<=3 else contributors[:3]
            for c in toshow:
                print(f"  - {c.get('name','unknown')}: {c.get('commits',0)} commit{'s' if c.get('commits',0)!=1 else ''}")
        else:
            # fallback: print raw
            for c in (contributors if len(contributors)<=3 else contributors[:3]):
                print(f"  - {c}")
    else:
        print("\nTop Contributors: (none)")

    print("\n" + "="*50)

finally:
    # cleanup
    if 'tempdir' in locals() and tempdir and os.path.exists(tempdir):
        try:
            shutil.rmtree(tempdir)
        except Exception:
            pass

print("Done.")


Loading artifacts (if available)...
  - loaded tech_stack_classifier.joblib
  - loaded mlb.joblib
  - loaded tech_domain_classifier.joblib
  - loaded domain_labels.joblib
  - loaded quality_model.joblib
  - failed to load scored_dataset.csv: pop from empty list
Loaded prograde_model from prograde_model.joblib (type: <class 'prograde_model.ProGrade'>)


Enter a GitHub repo URL (https.. or git@.. or local path):
>  https://github.com/rajank18/ScanX_frontend.git


Cloning (shallow) to temporary folder: C:\Users\odcha\AppData\Local\Temp\prograde_nb_clone_l5piaddc

Running heuristic analyze_repository(...) (ProGrade core)...

Calling model.analyze(...) from prograde_model.joblib...
Model analyze returned successfully.

Running ML predictions (quality + domain) using separate artifacts...
ML predictions ready.

ðŸš€ Analysis Report for: https://github.com/rajank18/ScanX_frontend.git

Repo Name: prograde_nb_clone_l5piaddc

Tech Stack (Detected):
  --- Languages ---
    - CSS
    - HTML
    - JavaScript
  --- Frameworks ---
    - React

Domain (Detected): Frontend Web Development

Scores:
  - Code Quality: 2.00
  - Comment Management: 1.00
  - Documentation: 3.00
  - Contribution: 4.00
  - Overall: 2.50

Top Contributors:
  - rajank18: 9 commits

Done.
