In [3]:
import os
import json
import torch
import numpy as np
import subprocess
import pandas as pd
from joblib import dump
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments

# ==== Config ====
MODEL_NAME = "microsoft/codebert-base"
DATA_DIR = "./data"
MODEL_DIR = "./models"
CODEBERT_DIR = "./codebert_finetuned"
SUPPORTED_LANGS = ["py"]  # Now only Python

# ==== Setup ====
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(CODEBERT_DIR, exist_ok=True)

# Load tokenizer
tokenizer = RobertaTokenizer.from_pretrained(MODEL_NAME)

# ==== 1. Load dataset from CSV ====
def load_dataset_from_csv():
    train_path = os.path.join(DATA_DIR, "code_search_net_train.csv")
    test_path = os.path.join(DATA_DIR, "code_search_net_test.csv")

    train_df = pd.read_csv(train_path)
    test_df = pd.read_csv(test_path)

    print("üìã Train Columns:", train_df.columns.tolist())
    print("üìã Test Columns:", test_df.columns.tolist())

    code_column = None
    for possible in ['code', 'content', 'func_code_string', 'snippet']:
        if possible in train_df.columns and possible in test_df.columns:
            code_column = possible
            break

    if not code_column:
        raise KeyError("‚ùå Could not find a valid code column in the CSV files.")

    train_codes = train_df[code_column].astype(str).tolist()
    test_codes = test_df[code_column].astype(str).tolist()

    # Fake labels (alternating 0/1)
    train_labels = [0 if i % 2 == 0 else 1 for i in range(len(train_codes))]
    test_labels = [0 if i % 2 == 0 else 1 for i in range(len(test_codes))]

    return train_codes + test_codes, train_labels + test_labels, ['py'] * (len(train_codes) + len(test_codes))

# ==== 2. Linter Feature Extractor ====
def extract_linter_features(code_list, lang_list):
    features = []
    for i, (code, lang) in enumerate(zip(code_list, lang_list)):
        temp_file = f"temp_{i}.{lang}"
        with open(temp_file, "w", encoding="utf-8") as f:
            f.write(code)

        counts = {"E": 0, "W": 0, "C": 0}
        try:
            result = subprocess.run(['pylint', temp_file, '--output-format=json'], capture_output=True, text=True)
            output = result.stdout.strip() or result.stderr.strip()
            data = json.loads(output) if output else []
            for item in data:
                msg = item.get("message-id", "")
                if msg.startswith("E"): counts["E"] += 1
                elif msg.startswith("W"): counts["W"] += 1
                elif msg.startswith("C"): counts["C"] += 1
        except Exception as e:
            print(f"‚ö†Ô∏è Linter failed: {e}")
        finally:
            os.remove(temp_file)

        features.append([counts["E"], counts["W"], counts["C"]])
    return np.array(features)

# ==== 3. Fine-tune CodeBERT ====
def fine_tune_codebert(code_samples, labels):
    tokenized = tokenizer(code_samples, padding="max_length", truncation=True, max_length=512, return_tensors="pt")
    X_train, X_test, y_train, y_test = train_test_split(tokenized['input_ids'], labels, test_size=0.2, random_state=42)

    class CodeDataset(torch.utils.data.Dataset):
        def __init__(self, input_ids, labels):
            self.input_ids = input_ids
            self.labels = labels

        def __len__(self): return len(self.labels)
        def __getitem__(self, idx):
            return {
                "input_ids": self.input_ids[idx],
                "attention_mask": (self.input_ids[idx] != tokenizer.pad_token_id),
                "labels": torch.tensor(self.labels[idx], dtype=torch.long)
            }

    train_dataset = CodeDataset(X_train, y_train)
    test_dataset = CodeDataset(X_test, y_test)

    model = RobertaForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2)
    training_args = TrainingArguments(
        output_dir=CODEBERT_DIR,
        num_train_epochs=3,
        per_device_train_batch_size=4,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_dir="./logs",
        load_best_model_at_end=True
    )

    trainer = Trainer(model=model, args=training_args, train_dataset=train_dataset, eval_dataset=test_dataset)
    trainer.train()
    model.save_pretrained(CODEBERT_DIR)

# ==== 4. Main ====
def main():
    print("üìÇ Loading dataset from CSV...")
    code_samples, labels, languages = load_dataset_from_csv()
    if not code_samples:
        print("‚ùå No code samples found.")
        return

    print("üß† Fine-tuning CodeBERT...")
    fine_tune_codebert(code_samples, labels)

    print("üß™ Extracting linter features...")
    linter_features = extract_linter_features(code_samples, languages)

    print("üìà Training Logistic Regression model...")
    X_train_feat, X_test_feat, y_train_feat, y_test_feat = train_test_split(
        linter_features, labels, test_size=0.2, random_state=42
    )

    log_reg = LogisticRegression()
    log_reg.fit(X_train_feat, y_train_feat)
    dump(log_reg, os.path.join(MODEL_DIR, "logistic_model_python.joblib"))
    print("‚úÖ Logistic Regression trained and saved.")

    # ‚úÖ Evaluate the model
    y_pred = log_reg.predict(X_test_feat)

    accuracy = accuracy_score(y_test_feat, y_pred)
    precision = precision_score(y_test_feat, y_pred)
    recall = recall_score(y_test_feat, y_pred)
    f1 = f1_score(y_test_feat, y_pred)
    conf_matrix = confusion_matrix(y_test_feat, y_pred)

    print(f"\nüìä Logistic Regression Evaluation:")
    print(f"   Accuracy : {accuracy:.4f}")
    print(f"   Precision: {precision:.4f}")
    print(f"   Recall   : {recall:.4f}")
    print(f"   F1 Score : {f1:.4f}")
    print(f"   Confusion Matrix:\n{conf_matrix}")

    # ‚úÖ Save test data & predictions
    test_results_df = pd.DataFrame(X_test_feat, columns=["Errors", "Warnings", "Conventions"])
    test_results_df["True_Label"] = y_test_feat
    test_results_df["Predicted_Label"] = y_pred
    test_results_df.to_csv(os.path.join(MODEL_DIR, "logistic_test_results.csv"), index=False)
    print("üíæ Test data and predictions saved to logistic_test_results.csv")

if __name__ == "__main__":
    main()


üìÇ Loading dataset from CSV...
üìã Train Columns: ['repository_name', 'func_path_in_repository', 'func_name', 'whole_func_string', 'language', 'func_code_string', 'func_code_tokens', 'func_documentation_string', 'func_documentation_tokens', 'split_name', 'func_code_url']
üìã Test Columns: ['repository_name', 'func_path_in_repository', 'func_name', 'whole_func_string', 'language', 'func_code_string', 'func_code_tokens', 'func_documentation_string', 'func_documentation_tokens', 'split_name', 'func_code_url']
üß† Fine-tuning CodeBERT...


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss
1,0.7092,0.699729
2,0.7022,0.696148
3,0.6968,0.696329


üß™ Extracting linter features...
üìà Training Logistic Regression model...
‚úÖ Logistic Regression trained and saved.

üìä Logistic Regression Evaluation:
   Accuracy : 0.4913
   Precision: 0.4811
   Recall   : 0.8325
   F1 Score : 0.6098
   Confusion Matrix:
[[ 75 343]
 [ 64 318]]
üíæ Test data and predictions saved to logistic_test_results.csv


In [2]:
import shutil
import os

cache_dir = os.path.expanduser("~/.cache/huggingface")

if os.path.exists(cache_dir):
    shutil.rmtree(cache_dir)
    print("‚úÖ Hugging Face cache deleted.")
else:
    print("‚ÑπÔ∏è Cache directory not found.")


‚úÖ Hugging Face cache deleted.
