# Code Similarity, AI Detection, and Plagiarism Analysis Notebook

This notebook provides a comprehensive pipeline for analyzing Python code submissions using advanced machine learning and NLP techniques. It is designed for:

- **AI-generated code detection**: Uses GPT-2 language model to estimate code perplexity and pattern analysis to identify AI-generated code.
- **Plagiarism detection**: Compares code against common algorithmic patterns and known sources using fast hashing and similarity matching.
- **Batch and parallel analysis**: Supports efficient batch processing and parallel execution for large-scale code review.
- **Performance metrics**: Includes benchmarking for single and batch analysis, with cache optimization for repeated code checks.
- **Device selection**: Automatically detects and utilizes GPU (CUDA) if available for faster inference, otherwise falls back to CPU.
- **Model saving**: Demonstrates saving HuggingFace models and tokenizers for production deployment.

## Key Components
- **OptimizedAIGeneratedCodeDetector**: Singleton class for AI detection using GPT-2, with caching and fast pattern analysis.
- **OptimizedPlagiarismDetector**: Detects plagiarism by matching code against pre-computed hashes and known patterns.
- **OptimizedCodeChecker**: Integrates AI and plagiarism detection, supports parallel and batch analysis, and provides actionable recommendations.
- **Test Suite**: Performance tests for single and batch code analysis, including suspicious, plagiarized, and original code examples.

## Usage
1. **Check Python environment and device**: Ensure required libraries are installed and GPU is available for optimal performance.
2. **Import libraries and classes**: All dependencies are imported and classes are defined for immediate use.
3. **Run analysis**: Use the provided test suite or custom code snippets to analyze for AI generation and plagiarism.
4. **Save models**: Save trained or pre-trained models for future use or deployment.

## Requirements
- Python 3.8+
- PyTorch
- Transformers (HuggingFace)
- NumPy, requests, pymongo

---

> **Note:** This notebook is optimized for speed, scalability, and production-readiness. All detection logic is modular and can be integrated into backend services or automated code review pipelines.

In [1]:
import sys
print(sys.executable)

e:\project\ML\syntax_env\python.exe


### Device Selection and GPU Availability

This notebook is optimized to run on systems with a CUDA-enabled GPU for faster code analysis and AI detection. The device check below will automatically detect and display the available GPU. If no GPU is found, the notebook will default to CPU execution.

- **Why GPU?** GPU acceleration significantly speeds up model inference and batch processing, making large-scale code review much more efficient.
- **Automatic Detection:** The code cell checks for GPU availability and prints the device name if found, or notifies you if only CPU is available.

> **Recommendation:** For best performance, run this notebook on a machine with a CUDA-enabled GPU.

In [7]:
import torch
gpu_available = torch.cuda.is_available()
if gpu_available:
    print(f"GPU available: {torch.cuda.get_device_name(torch.cuda.current_device())}")
else:
    print("No GPU available")

GPU available: NVIDIA GeForce GTX 1650


### Project Dependencies

This notebook uses a combination of **data processing**, **AI/ML**, **NLP**, and **database** libraries for code analysis and detection tasks.

**Core Libraries:**
- `numpy` – Numerical computations and array operations  
- `torch` – PyTorch for deep learning and model inference  
- `transformers` – Hugging Face models and tokenizers (`AutoTokenizer`, `AutoModel`, `GPT2LMHeadModel`, `GPT2Tokenizer`)  

**Code Analysis & Processing:**
- `re` – Regular expressions for pattern matching  
- `ast` – Abstract Syntax Tree parsing for code inspection  
- `hashlib` – Generating hashes for code fingerprinting  
- `difflib` – Sequence matching for similarity detection  

**Performance & Utilities:**
- `functools.lru_cache` – Caching for faster repeated computations  
- `concurrent.futures.ThreadPoolExecutor` – Parallel execution  
- `warnings` – Suppressing unnecessary warnings  

**Web & Database:**
- `requests` – Sending HTTP requests  
- `pymongo.MongoClient` – Interacting with MongoDB  


> ⚡ **Tip:** Importing these libraries upfront ensures smooth execution for code analysis, AI detection, and database operations.


In [4]:
import numpy as np
import torch
import re
import ast
import hashlib
import difflib
from transformers import AutoTokenizer, AutoModel, GPT2LMHeadModel, GPT2Tokenizer
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor
import warnings
warnings.filterwarnings('ignore')
import requests
from pymongo import MongoClient

  from .autonotebook import tqdm as notebook_tqdm


---

## AI-Generated Code Detection Logic

This section explains the logic and methodology behind detecting AI-generated code submissions:

- **Perplexity Analysis**: Utilizes the GPT-2 language model to calculate the perplexity of code snippets. Lower perplexity values often indicate AI-generated code due to the model's familiarity with such patterns.
- **Pattern Recognition**: Analyzes code structure, variable naming conventions, comment ratios, and indentation consistency to identify characteristics typical of AI-generated code.
- **Caching for Speed**: Implements LRU caching to accelerate repeated analysis and improve scalability for large datasets.
- **Singleton Model Loading**: Ensures models are loaded only once per session, reducing memory usage and initialization time.
- **Batch and Parallel Processing**: Supports efficient batch analysis and parallel execution for rapid review of multiple code samples.

> This modular AI detection logic is designed for integration into automated code review systems, online judges, and educational platforms to help maintain code authenticity and integrity.

In [5]:
class OptimizedAIGeneratedCodeDetector:
    """Optimized AI detection with caching and faster inference"""
    
    _instance = None
    
    def __new__(cls):
        """Singleton pattern - load models only once"""
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
            
        # Load models once and keep in memory
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Loading models on {self.device}...")
        
        self.gpt2_model = GPT2LMHeadModel.from_pretrained('gpt2').to(self.device)
        self.gpt2_tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        self.gpt2_tokenizer.pad_token = self.gpt2_tokenizer.eos_token
        
        # Set to eval mode for faster inference
        self.gpt2_model.eval()
        
        self._initialized = True
        print("Models loaded successfully!")
    
    @lru_cache(maxsize=1000)
    def calculate_perplexity(self, code):
        """Cached perplexity calculation - 10x faster for repeated code"""
        try:
            code_lines = [line.strip() for line in code.split('\n') if line.strip()]
            code_text = ' '.join(code_lines)
            
            encodings = self.gpt2_tokenizer(
                code_text, 
                return_tensors='pt', 
                truncation=True, 
                max_length=512  # Reduced from 1024 for speed
            ).to(self.device)
            
            with torch.no_grad():
                outputs = self.gpt2_model(**encodings, labels=encodings['input_ids'])
                perplexity = torch.exp(outputs.loss)
                
            return float(perplexity)
        except:
            return float('inf')
    
    @lru_cache(maxsize=2000)
    def analyze_code_patterns(self, code):
        """Cached pattern analysis - reuses results for similar code"""
        features = {}
        
        lines = code.split('\n')
        total_lines = len(lines)
        
        # Fast comment detection
        comment_lines = sum(1 for line in lines if line.strip().startswith('#'))
        features['comment_ratio'] = comment_lines / max(total_lines, 1)
        
        # Fast variable analysis (without full AST parsing when possible)
        try:
            # Quick regex-based variable extraction (faster than AST)
            var_pattern = r'\b[a-zA-Z_][a-zA-Z0-9_]*\b'
            var_names = re.findall(var_pattern, code)
            var_names = [v for v in var_names if v not in ['def', 'class', 'if', 'else', 'for', 'while', 'return']]
            
            if var_names:
                avg_name_length = np.mean([len(name) for name in var_names])
                descriptive_names = sum(1 for name in var_names if len(name) > 3 and '_' in name)
                features['avg_var_name_length'] = avg_name_length
                features['descriptive_name_ratio'] = descriptive_names / len(var_names)
            else:
                features['avg_var_name_length'] = 0
                features['descriptive_name_ratio'] = 0
        except:
            features['avg_var_name_length'] = 0
            features['descriptive_name_ratio'] = 0
        
        # Fast indentation check
        indents = [len(line) - len(line.lstrip()) for line in lines if line.strip()]
        features['indent_consistency'] = 1 - (np.std(indents) / (np.mean(indents) + 1)) if indents else 0
        
        return tuple(features.items())  # Return tuple for caching
    
    def detect_ai_generated(self, code):
        """Optimized AI detection with batch processing support"""
        # Use cached calculations
        perplexity = self.calculate_perplexity(code)
        patterns_tuple = self.analyze_code_patterns(code)
        patterns = dict(patterns_tuple)
        
        # Vectorized scoring (faster than if-else chains)
        ai_score = 0.0
        
        # Perplexity scoring
        ai_score += 0.4 if perplexity < 50 else (0.2 if perplexity < 100 else 0)
        
        # Pattern scoring (vectorized)
        pattern_scores = [
            0.2 if patterns['comment_ratio'] > 0.3 else 0,
            0.15 if patterns['avg_var_name_length'] > 8 else 0,
            0.15 if patterns['descriptive_name_ratio'] > 0.7 else 0,
            0.1 if patterns['indent_consistency'] > 0.9 else 0
        ]
        ai_score += sum(pattern_scores)
        
        return {
            'ai_probability': min(ai_score, 1.0),
            'perplexity': perplexity,
            'patterns': patterns,
            'is_ai_generated': ai_score > 0.6
        }


---

## Plagiarism Detection Logic

This section details the approach used for detecting plagiarism in code submissions:

- **Common Algorithm Patterns**: The detector checks submitted code against a set of well-known algorithmic patterns (e.g., quicksort, binary search, bubble sort, merge sort, recursive Fibonacci).
- **Hash-Based Matching**: Each pattern is pre-processed and stored as a hash for fast exact matching. If a code snippet matches a known pattern hash, it is flagged as an exact match.
- **Similarity Analysis**: For non-exact matches, the detector uses sequence similarity to compare code structure and logic, flagging submissions with high similarity scores.
- **Source Attribution**: When plagiarism is detected, the system reports the sources (e.g., StackOverflow, GitHub, LeetCode) where the matching pattern is commonly found.
- **Performance**: The logic is optimized for speed using caching and early termination, making it suitable for large-scale automated code review.

> This modular approach allows for easy extension with new patterns and sources, and can be integrated into backend services for real-time plagiarism detection.

In [6]:
class OptimizedPlagiarismDetector:
    """Optimized plagiarism detection with faster matching"""
    
    def __init__(self):
        self.embedder = None  # Lazy loading
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # Pre-compute hashes for faster lookup
        self.pattern_hashes = {}
        self.patterns_by_hash = {}
        self._load_common_snippets()
        
    def _load_common_snippets(self):
        """Optimized snippet loading with pre-computed hashes"""
        snippets = {
            'quicksort_basic': {
                'pattern': 'def quicksort(arr): if len(arr) <= 1: return arr',
                'sources': ['stackoverflow', 'github']
            },
            'fibonacci_recursive': {
                'pattern': 'def fibonacci(n): if n <= 1: return n return fibonacci(n-1) + fibonacci(n-2)',
                'sources': ['common_algorithm']
            },
            'binary_search': {
                'pattern': 'def binary_search(arr, target): left = 0 right = len(arr) - 1',
                'sources': ['leetcode', 'github']
            },
            'bubble_sort': {
                'pattern': 'def bubble_sort(arr): for i in range(len(arr)): for j in range(len(arr)-i-1):',
                'sources': ['stackoverflow']
            },
            'merge_sort': {
                'pattern': 'def merge_sort(arr): if len(arr) > 1: mid = len(arr) // 2',
                'sources': ['github', 'common_algorithm']
            }
        }
        
        # Pre-compute all hashes
        for name, info in snippets.items():
            pattern_hash = hashlib.md5(info['pattern'].encode()).hexdigest()
            self.pattern_hashes[name] = pattern_hash
            self.patterns_by_hash[pattern_hash] = {
                'name': name,
                'pattern': info['pattern'],
                'sources': info['sources']
            }
    
    @lru_cache(maxsize=2000)
    def normalize_for_comparison(self, code):
        """Cached normalization - 5x faster for repeated code"""
        # Fast regex normalization (faster than AST for simple cases)
        code = re.sub(r'#.*', '', code)
        code = re.sub(r'"""[\s\S]*?"""|\'\'\'[\s\S]*?\'\'\'', '', code)
        code = re.sub(r'\s+', ' ', code).strip()
        return code
    
    def check_against_common_patterns(self, code):
        """Optimized pattern matching with early termination"""
        normalized_code = self.normalize_for_comparison(code)
        code_hash = hashlib.md5(normalized_code.encode()).hexdigest()
        
        # Fast exact match check (O(1) lookup)
        if code_hash in self.patterns_by_hash:
            pattern_info = self.patterns_by_hash[code_hash]
            return [{
                'pattern': pattern_info['name'],
                'similarity': 1.0,
                'sources': pattern_info['sources'],
                'match_type': 'exact'
            }]
        
        # Similarity check only if no exact match
        matches = []
        for pattern_hash, pattern_info in self.patterns_by_hash.items():
            # Fast length check before expensive similarity calculation
            len_diff = abs(len(normalized_code) - len(pattern_info['pattern']))
            if len_diff / max(len(normalized_code), len(pattern_info['pattern'])) > 0.3:
                continue  # Skip if lengths differ too much
            
            similarity = difflib.SequenceMatcher(
                None, 
                normalized_code, 
                pattern_info['pattern']
            ).ratio()
            
            if similarity > 0.8:
                matches.append({
                    'pattern': pattern_info['name'],
                    'similarity': similarity,
                    'sources': pattern_info['sources'],
                    'match_type': 'similar'
                })
        
        return matches
    
    def detect_online_plagiarism(self, code):
        """Optimized plagiarism detection"""
        matches = self.check_against_common_patterns(code)
        
        if matches:
            max_similarity = max(match['similarity'] for match in matches)
            return {
                'is_plagiarized': max_similarity > 0.85,
                'max_similarity': max_similarity,
                'matches': matches,
                'sources_found': list(set(source for match in matches for source in match['sources']))
            }
        
        return {
            'is_plagiarized': False,
            'max_similarity': 0.0,
            'matches': [],
            'sources_found': []
        }

---

## Code Analysis and Recommendation Engine

This section introduces the integrated code analysis engine, which combines AI-generated code detection and plagiarism detection to provide a comprehensive assessment of code submissions:

- **Parallel Processing**: The engine runs AI and plagiarism checks in parallel for faster results, making it suitable for batch analysis and large datasets.
- **Suspiciousness Scoring**: Each code snippet is evaluated for signs of AI generation and plagiarism, with an overall suspiciousness score and detailed breakdown.
- **Actionable Recommendations**: Based on the analysis, the engine provides clear recommendations, such as flagging high-risk submissions, suggesting manual review, or confirming clean code.
- **Batch Support**: Multiple code samples can be analyzed simultaneously, with results aggregated for efficient review.
- **Cache Management**: Built-in cache clearing methods help manage memory and maintain performance during repeated or large-scale analysis.

> This modular recommendation engine can be integrated into automated code review systems, online judges, or educational platforms to enhance code integrity and fairness.

In [7]:

class OptimizedCodeChecker:
    """Optimized checker with parallel processing"""
    
    def __init__(self):
        self.ai_detector = OptimizedAIGeneratedCodeDetector()
        self.plagiarism_detector = OptimizedPlagiarismDetector()
        self.executor = ThreadPoolExecutor(max_workers=2)
    
    def analyze_code(self, code, parallel=True):
        """Analyze code with optional parallel processing"""
        
        if parallel:
            # Run AI detection and plagiarism detection in parallel
            future_ai = self.executor.submit(self.ai_detector.detect_ai_generated, code)
            future_plag = self.executor.submit(self.plagiarism_detector.detect_online_plagiarism, code)
            
            ai_result = future_ai.result()
            plagiarism_result = future_plag.result()
        else:
            ai_result = self.ai_detector.detect_ai_generated(code)
            plagiarism_result = self.plagiarism_detector.detect_online_plagiarism(code)
        
        overall_suspicious = (
            ai_result['is_ai_generated'] or 
            plagiarism_result['is_plagiarized']
        )
        
        return {
            'overall_suspicious': overall_suspicious,
            'ai_detection': ai_result,
            'plagiarism_detection': plagiarism_result,
            'recommendation': self._get_recommendation(ai_result, plagiarism_result)
        }
    
    def analyze_batch(self, codes):
        """Batch analysis for multiple codes - much faster"""
        results = []
        
        # Process in parallel batches
        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = [executor.submit(self.analyze_code, code, False) for code in codes]
            results = [future.result() for future in futures]
        
        return results
    
    def _get_recommendation(self, ai_result, plagiarism_result):
        """Fast recommendation generation"""
        if ai_result['is_ai_generated'] and plagiarism_result['is_plagiarized']:
            return "HIGH RISK: Code shows signs of both AI generation and plagiarism"
        elif ai_result['is_ai_generated']:
            return f"AI DETECTED: Code likely generated by AI (probability: {ai_result['ai_probability']:.2f})"
        elif plagiarism_result['is_plagiarized']:
            sources = ', '.join(plagiarism_result['sources_found'])
            return f"PLAGIARISM DETECTED: Code matches known sources ({sources})"
        elif ai_result['ai_probability'] > 0.4 or plagiarism_result['max_similarity'] > 0.6:
            return "MODERATE RISK: Some suspicious patterns detected, manual review recommended"
        else:
            return "CLEAN: No significant AI generation or plagiarism detected"
    
    def clear_cache(self):
        """Clear LRU caches to free memory"""
        self.ai_detector.calculate_perplexity.cache_clear()
        self.ai_detector.analyze_code_patterns.cache_clear()
        self.plagiarism_detector.normalize_for_comparison.cache_clear()



---

## Test Cases and Performance Metrics

This section provides a suite of test cases to validate the code analysis engine and benchmark its performance:

- **Test Coverage**: Includes examples of suspicious AI-like code, common algorithms (potential plagiarism), and original code to demonstrate detection capabilities.
- **Single and Batch Analysis**: Measures the time taken for individual and batch code analysis, highlighting the speedup from parallel processing.
- **Performance Reporting**: Outputs suspiciousness, AI probability, plagiarism status, and recommendations for each test case, along with timing metrics.
- **Cache Efficiency**: (Commented) Optionally tests cache performance for repeated analysis, showing the benefits of caching in large-scale scenarios.

> Use these tests to ensure the reliability and efficiency of the detection pipeline before deploying in production or integrating with automated review systems.

In [9]:
# Optimized testing with performance metrics
def test_optimized_checker():
    import time
    
    checker = OptimizedCodeChecker()
    
    test_codes = [
        {
            'name': 'Suspicious AI-like code',
            'code': '''
def calculate_fibonacci_sequence(number_of_terms):
    """
    Calculate fibonacci sequence up to n terms
    This function uses recursion to calculate fibonacci numbers
    """
    if number_of_terms <= 0:
        return "Please enter a positive integer"
    elif number_of_terms == 1:
        return 0
    elif number_of_terms == 2:
        return [0, 1]
    else:
        fibonacci_sequence = [0, 1]
        for i in range(2, number_of_terms):
            fibonacci_sequence.append(fibonacci_sequence[i-1] + fibonacci_sequence[i-2])
        return fibonacci_sequence
            '''
        },
        {
            'name': 'Common algorithm (potential plagiarism)',
            'code': '''
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)
            '''
        },
        {
            'name': 'Original-looking code',
            'code': '''
def calc(x, y):
    z = x + y
    return z * 2
            '''
        }
    ]
    
    print("=== PERFORMANCE TEST ===\n")
    
    # Single analysis test
    print("Testing single code analysis...")
    start = time.time()
    for test in test_codes:
        result = checker.analyze_code(test['code'])
        print(f"\n{test['name']}:")
        print(f"  Suspicious: {result['overall_suspicious']}")
        print(f"  AI Probability: {result['ai_detection']['ai_probability']:.2f}")
        print(f"  Plagiarism: {result['plagiarism_detection']['is_plagiarized']}")
        print(f"  Recommendation: {result['recommendation']}")
    
    single_time = time.time() - start
    print(f"\nSingle analysis time: {single_time:.2f}s")
    
    # Batch analysis test
    print("\n=== Testing batch analysis (3 codes in parallel) ===")
    start = time.time()
    batch_results = checker.analyze_batch([test['code'] for test in test_codes])
    batch_time = time.time() - start
    print(f"Batch analysis time: {batch_time:.2f}s")
    print(f"Speedup: {single_time/batch_time:.2f}x faster")
    
    # # Cache performance test
    # print("\n=== Testing cache performance (re-analyzing same code) ===")
    # start = time.time()
    # for _ in range(3):
    #     checker.analyze_code(test_codes[0]['code'])
    # cached_time = time.time() - start
    # print(f"3 cached analyses time: {cached_time:.2f}s ({cached_time/3:.2f}s per analysis)")
    # print(f"Cache speedup: {single_time/(cached_time/3):.2f}x faster")

if __name__ == "__main__":
    test_optimized_checker()

=== PERFORMANCE TEST ===

Testing single code analysis...

Suspicious AI-like code:
  Suspicious: False
  AI Probability: 0.55
  Plagiarism: False
  Recommendation: MODERATE RISK: Some suspicious patterns detected, manual review recommended

Common algorithm (potential plagiarism):
  Suspicious: True
  AI Probability: 0.40
  Plagiarism: True
  Recommendation: PLAGIARISM DETECTED: Code matches known sources (common_algorithm)

Original-looking code:
  Suspicious: False
  AI Probability: 0.40
  Plagiarism: False
  Recommendation: CLEAN: No significant AI generation or plagiarism detected

Single analysis time: 0.01s

=== Testing batch analysis (3 codes in parallel) ===
Batch analysis time: 0.00s
Speedup: 1.35x faster


---

## Model Saving and Deployment

This section demonstrates how to save the optimized GPT-2 model and tokenizer in HuggingFace format for future use or production deployment. Saving models in this way allows for easy loading, sharing, and integration into backend services or cloud environments.

- **save_dir**: The directory where the model and tokenizer will be stored.
- **gpt2_model.save_pretrained(save_dir)**: Saves the model weights and configuration.
- **gpt2_tokenizer.save_pretrained(save_dir)**: Saves the tokenizer files for consistent preprocessing.

> After saving, you can reload the model and tokenizer using `from_pretrained(save_dir)` in any compatible environment.

**Tip:** Always version your saved models and document the training or fine-tuning process for reproducibility and auditability.

In [None]:
# Save the optimized model and tokenizer
save_dir = "../models"                 # choose a folder name
gpt2_model = GPT2LMHeadModel.from_pretrained("gpt2")
gpt2_tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

# save everything in Hugging-Face format
gpt2_model.save_pretrained(save_dir)
gpt2_tokenizer.save_pretrained(save_dir)

print(f"Model and tokenizer saved in {save_dir}")


Model and tokenizer saved in ../models
