<a href="https://colab.research.google.com/github/DeveloperPratim/sses_final/blob/main/Fixed%20GetData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers torch sentence-transformers



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import math
import re
from sentence_transformers import SentenceTransformer, util  # For semantic similarityimport math

# Global variables for models and tokenizers
model_qwen = None
tokenizer_qwen = None
sentence_transformer = None
models_loaded = False  # Flag to ensure models are loaded only once


def load_models():
    """
    Load global models and tokenizers.
    This function should be called only once.
    """
    global model_qwen, tokenizer_qwen, sentence_transformer, models_loaded

    if not models_loaded:  # Load models only if not already loaded
        print("Loading models...")

        # Load Qwen model
        drive_path_qwen = "/content/drive/MyDrive/sses/Qwen2.5-1.5B-Instruct"
        tokenizer_qwen = AutoTokenizer.from_pretrained(drive_path_qwen, local_files_only=True)
        model_qwen = AutoModelForCausalLM.from_pretrained(drive_path_qwen, local_files_only=True).to(torch.device("cpu"))

        # Load SentenceTransformer for semantic similarity (MiniLM model)
        drive_path_minilm = "/content/drive/MyDrive/sses/all-MiniLM-L6-v2"
        sentence_transformer = SentenceTransformer(drive_path_minilm, device="cpu")

        models_loaded = True
        print("Models loaded successfully.")
    else:
        print("Models are already loaded.")


def generate_response(question, student_answer, marks):
    """
    Generate a response using the Qwen model.

    :param question: The input question.
    :param student_answer: The input student's answer.
    :return: The generated text response.
    """
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Prepare the input text
    input_text = f"""
    Question: {question}
    Answer: {student_answer}
    Evaluate and provide these two fields **score** and **feedback** based on this answer where full marks is {marks} and can be given for the most precise and best answer, and 0 for wrong or irrelevant answers. Respond only with these two fields in JSON format.
    """

    # Tokenize the input
    input_ids = tokenizer_qwen.encode(input_text, return_tensors="pt").to(torch.device("cpu"))

    # Dynamically set the maximum number of new tokens
    max_allowed_tokens = model_qwen.config.max_position_embeddings  # Total token limit of the model
    input_token_count = len(input_ids[0])
    max_new_tokens = max(50, max_allowed_tokens - input_token_count)  # Minimum 50 new tokens

    # Generate the response
    output_ids = model_qwen.generate(
        input_ids,
        max_length=input_token_count + max_new_tokens,  # Total length: input + generated tokens
        num_return_sequences=1,
        no_repeat_ngram_size=2,  # Avoid repetition
        early_stopping=True,    # Stop when the output is complete
    )

    # Decode and return the generated output
    return tokenizer_qwen.decode(output_ids[0], skip_special_tokens=True)


def extract_score_and_feedback(generated_text):
    """
    Extract score and feedback from the generated text using regular expressions.

    :param generated_text: The raw output from the model.
    :return: A dictionary with "score" and "feedback".
    """
    score_pattern = r'"score":\s*([0-9.]+)'  # Matches numeric values for score
    feedback_pattern = r'"feedback":\s*"([^"]+)"'  # Matches the feedback string

    # Find matches
    score_match = re.search(score_pattern, generated_text)
    feedback_match = re.search(feedback_pattern, generated_text)

    # Extract values
    score = float(score_match.group(1)) if score_match else None  # Convert to float if match found
    feedback = feedback_match.group(1) if feedback_match else None  # Extract feedback text if match found

    return {"score": score, "feedback": feedback}


def calculate_semantic_similarity(student_answer, expected_answer):
    """
    Calculate the semantic similarity between the student's answer and the expected answer.

    :param student_answer: The student's answer.
    :param expected_answer: The expected correct answer.
    :return: The semantic similarity score.
    """
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Encode both answers using the SentenceTransformer model
    embeddings = sentence_transformer.encode([student_answer, expected_answer], convert_to_tensor=True)

    # Compute cosine similarity
    similarity_score = util.pytorch_cos_sim(embeddings[0], embeddings[1]).item()
    return similarity_score

"""

def GetData(question, student_answer, expected_answer, marks):
    '''
    Function to generate evaluation data based on the student's answer.

    :param question: The question asked in the exam.
    :param student_answer: The student's answer to the question.
    :param expected_answer: The expected correct answer for the question.
    :param marks: The full marks assigned to the question.

    :return: A dictionary containing the analysis result.
    '''
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Generate response using Qwen model
    generated_text = generate_response(question, student_answer, marks)
    # Extract score and feedback
    result = extract_score_and_feedback(generated_text)
    score = result['score']
    feedback = result['feedback']

    # Calculate semantic similarity between student's answer and expected answer
    similarity = calculate_semantic_similarity(student_answer, expected_answer)

    # Calculate final marks obtained based on score and semantic similarity
    similarity_weight = 0.5  # You can adjust this weight based on your preference
    final_marks_obtained = (score * 0.5) + (similarity * similarity_weight)

    # Ensure that final_marks_obtained does not exceed the total marks
    final_marks_obtained = min(final_marks_obtained, marks)

    # Return the result as a dictionary
    result_data = {
        "question": question,
        "student_answer": student_answer,
        "expected_answer": expected_answer,
        "marks": marks,
        "score": score,
        "feedback": feedback,
        "similarity": similarity,
        "final_marks_obtained": final_marks_obtained
    }

    return result_data



    # Weighted sum of score and similarity
    weighted_score = log_score * 0.5  # Adjust weight as needed
    weighted_similarity = log_similarity * 0.5  # Adjust weight as needed

    # Calculate an initial final marks based on the weighted sum
    final_marks_obtained = weighted_score + weighted_similarity
    final_marks_obtained = final_marks_obtained * marks  # Scale by full marks

    # Apply penalty: if the final marks exceed 90% of full marks, reduce them
    max_allowed_marks = marks * 0.9  # No student should get more than 90% of full marks
    if final_marks_obtained > max_allowed_marks:
        penalty_factor = 0.8  # Apply a 20% penalty
        final_marks_obtained = final_marks_obtained * penalty_factor

    # Ensure that the final marks are never below 0
    final_marks_obtained = max(final_marks_obtained, 0)

    final_marks_obtained = ( final_marks_obtained  + score) /2


    # Calculate percentages
    final_marks_percentage = round((final_marks_obtained / marks) * 100, 2) if marks > 0 else 0
    score_percentage = round((score / marks) * 100, 2) if marks > 0 else 0

    # Analyze the student's answer
    student_word_count = len(student_answer.split())  # Count words in the student's answer
    student_sentence_count = len(re.split(r'[.!?]+', student_answer.strip())) - 1  # Count sentences

    # Return the result as a dictionary
    result_data = {
        "question": question,
        "student_answer": student_answer,
        "expected_answer": expected_answer,
        "marks": marks,
        "score": score,
        "score_percentage": score_percentage,
        "feedback": feedback,
        "similarity": similarity,
        "similarity_percentage": similarity_percentage,
        "final_marks_obtained": final_marks_obtained,
        "final_marks_percentage": final_marks_percentage,
        "student_word_count": student_word_count,
        "student_sentence_count": student_sentence_count
    }


"""
'''

def GetData(question, student_answer, expected_answer, marks):
    """
    Function to generate evaluation data based on the student's answer.

    :param question: The question asked in the exam.
    :param student_answer: The student's answer to the question.
    :param expected_answer: The expected correct answer for the question.
    :param marks: The full marks assigned to the question.

    :return: A dictionary containing the analysis result.
    """
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Generate response using Qwen model
    generated_text = generate_response(question, student_answer, marks)
    # Extract score and feedback
    result = extract_score_and_feedback(generated_text)
    score = result['score']
    feedback = result['feedback']

    # Calculate semantic similarity between student's answer and expected answer
    similarity = calculate_semantic_similarity(student_answer, expected_answer)
    similarity_percentage = round(similarity * 100, 2)  # Convert similarity to percentage

    # Logarithmic scaling of score and similarity
    log_score = math.log(score + 1)  # Logarithm of score (score + 1 to avoid log(0))
    log_similarity = math.log(similarity + 1)  # Logarithm of similarity (similarity + 1 to avoid log(0))

    # Weighted sum of score and similarity
    weighted_score = log_score * 0.5  # Adjust weight as needed
    weighted_similarity = log_similarity * 0.5  # Adjust weight as needed

    # Calculate an initial final marks based on the weighted sum
    final_marks_obtained = weighted_score + weighted_similarity
    final_marks_obtained = final_marks_obtained * marks  # Scale by full marks

    # Apply penalty: if the final marks exceed 90% of full marks, reduce them
    max_allowed_marks = marks * 0.9  # No student should get more than 90% of full marks
    if final_marks_obtained > max_allowed_marks:
        penalty_factor = 0.8  # Apply a 20% penalty
        final_marks_obtained = final_marks_obtained * penalty_factor

    # Ensure that the final marks are never below 0
    final_marks_obtained = max(final_marks_obtained, 0)

    final_marks_obtained = (final_marks_obtained + score) / 2

    # Calculate percentages
    final_marks_percentage = round((final_marks_obtained / marks) * 100, 2) if marks > 0 else 0
    score_percentage = round((score / marks) * 100, 2) if marks > 0 else 0

    # Analyze the student's answer
    student_word_count = len(student_answer.split())  # Count words in the student's answer
    student_sentence_count = len(re.split(r'[.!?]+', student_answer.strip())) - 1  # Count sentences

    # Check for plagiarism
    results_plagiarism = plagiarism_checker(student_answer)

    # Detect AI-generated text
    result_AI = detect_ai_generated_text_advanced_v3(student_answer, threshold=0.7)

    # Return the result as a dictionary, including plagiarism and AI-detection data
    result_data = {
        "question": question,
        "student_answer": student_answer,
        "expected_answer": expected_answer,
        "marks": marks,
        "score": score,
        "score_percentage": score_percentage,
        "feedback": feedback,
        "similarity": similarity,
        "similarity_percentage": similarity_percentage,
        "final_marks_obtained": final_marks_obtained,
        "final_marks_percentage": final_marks_percentage,
        "student_word_count": student_word_count,
        "student_sentence_count": student_sentence_count,
        "plagiarism": results_plagiarism["plagiarism"],
        "average_score": results_plagiarism["average_score"],
        "max_score": results_plagiarism["max_score"],
        "max_score_url": results_plagiarism["max_score_url"],
        "plagiarism_data": results_plagiarism["data"],
        "AI_category": result_AI["category"],
        "AI_flag": result_AI["AI"],
        "AI_percentage": result_AI["AI_Percentage"],
        "AI_confidence": result_AI["confidence"],
        "AI_confidence_difference": result_AI["confidence_difference"],
        "AI_probabilities": result_AI["probabilities"],
        "AI_metrics": result_AI["metrics"],
        "AI_threshold_used": result_AI["threshold_used"],
    }

    return result_data
'''



import math
import re

def GetData(question, student_answer, expected_answer, marks):
    """
    Function to generate evaluation data based on the student's answer.

    :param question: The question asked in the exam.
    :param student_answer: The student's answer to the question.
    :param expected_answer: The expected correct answer for the question.
    :param marks: The full marks assigned to the question.

    :return: A dictionary containing the analysis result.
    """
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Ensure non-empty inputs for student_answer and question
    student_answer = student_answer or ""  # Default to empty string if None or empty
    expected_answer = expected_answer or ""  # Default to empty string if None or empty
    marks = marks or 0  # Default to 0 if marks are None or empty
    question = question or ""  # Default to empty string if None or empty

    # Generate response using Qwen model
    generated_text = generate_response(question, student_answer, marks)

    # Extract score and feedback
    result = extract_score_and_feedback(generated_text)
    score = result.get('score', 0)  # Default to 0 if score is None or not found
    feedback = result.get('feedback', "")  # Default to empty string if feedback is None or not found

    # Calculate semantic similarity between student's answer and expected answer
    similarity = calculate_semantic_similarity(student_answer, expected_answer) or 0  # Default to 0 if None
    similarity_percentage = round(similarity * 100, 2)  # Convert similarity to percentage

    # Logarithmic scaling of score and similarity
    log_score = math.log(score + 1) if score is not None else 0  # Logarithm of score (score + 1 to avoid log(0))
    log_similarity = math.log(similarity + 1) if similarity is not None else 0  # Logarithm of similarity (similarity + 1 to avoid log(0))

    # Weighted sum of score and similarity
    weighted_score = log_score * 0.5  # Adjust weight as needed
    weighted_similarity = log_similarity * 0.5  # Adjust weight as needed

    # Calculate an initial final marks based on the weighted sum
    final_marks_obtained = weighted_score + weighted_similarity
    final_marks_obtained = final_marks_obtained * marks  # Scale by full marks

    # Apply penalty: if the final marks exceed 90% of full marks, reduce them
    max_allowed_marks = marks * 0.9  # No student should get more than 90% of full marks
    if final_marks_obtained > max_allowed_marks:
        penalty_factor = 0.8  # Apply a 20% penalty
        final_marks_obtained = final_marks_obtained * penalty_factor

    # Ensure that the final marks are never below 0
    final_marks_obtained = max(final_marks_obtained, 0)

    final_marks_obtained = (final_marks_obtained + score) / 2

    # Calculate percentages
    final_marks_percentage = round((final_marks_obtained / marks) * 100, 2) if marks > 0 else 0
    score_percentage = round((score / marks) * 100, 2) if marks > 0 else 0

    # Analyze the student's answer
    student_word_count = len(student_answer.split())  # Count words in the student's answer
    student_sentence_count = len(re.split(r'[.!?]+', student_answer.strip())) - 1  # Count sentences

    # Check for plagiarism
    results_plagiarism = plagiarism_checker(student_answer) or {}

    # Detect AI-generated text
    result_AI = detect_ai_generated_text_advanced_v3(student_answer, threshold=0.7) or {}

    # Return the result as a dictionary, including plagiarism and AI-detection data
    result_data = {
        "question": question,
        "student_answer": student_answer,
        "expected_answer": expected_answer,
        "marks": marks,
        "score": score,
        "score_percentage": score_percentage,
        "feedback": feedback,
        "similarity": similarity,
        "similarity_percentage": similarity_percentage,
        "final_marks_obtained": final_marks_obtained,
        "final_marks_percentage": final_marks_percentage,
        "student_word_count": student_word_count,
        "student_sentence_count": student_sentence_count,
        "plagiarism": results_plagiarism.get("plagiarism", False),
        "average_score": results_plagiarism.get("average_score", 0.0),
        "max_score": results_plagiarism.get("max_score", 0.0),
        "max_score_url": results_plagiarism.get("max_score_url", ""),
        "plagiarism_data": results_plagiarism.get("data", ""),
        "AI_category": result_AI.get("category", "Unknown"),
        "AI_flag": result_AI.get("AI", False),
        "AI_percentage": result_AI.get("AI_Percentage", 0.0),
        "AI_confidence": result_AI.get("confidence", 0.0),
        "AI_confidence_difference": result_AI.get("confidence_difference", 0.0),
        "AI_probabilities": result_AI.get("probabilities", {}),
        "AI_metrics": result_AI.get("metrics", {}),
        "AI_threshold_used": result_AI.get("threshold_used", 0.0),
    }

    return result_data





import requests
from bs4 import BeautifulSoup
import hashlib
import json
import random
import nltk
from nltk.tokenize import sent_tokenize
from typing import List, Dict, Any

# Ensure NLTK resources are downloaded for sentence tokenization
try:
    nltk.data.find('tokenizers/punkt_tab')
except LookupError:
    print("Downloading 'punkt' package...")
    nltk.download('punkt_tab')

def get_useragent():
    _useragent_list = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:66.0) Gecko/20100101 Firefox/66.0',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.62',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0'
    ]
    return random.choice(_useragent_list)

def google_search(query):
    links = []
    try:
        headers = {"User-Agent": get_useragent()}
        params = {"q": f'"{query}"', "num": 3, "hl": 'en'}

        # Perform the Google search request
        response = requests.get("https://www.google.com/search", headers=headers, params=params, timeout=5)
        response.raise_for_status()  # Raises an error for bad responses

        # Parse the response content with BeautifulSoup
        soup = BeautifulSoup(response.text, 'html.parser')
        # Extract URLs from the search result blocks (avoiding ads, etc.)

        try:
            for result in soup.find_all('h3'):
                link = result.find_parent('a')
                if link:
                    link_text = link.get('href')
                    links.append(link_text)
        except:
            for link_tag in soup.select('div.yuRUbf a'):
                link = link_tag.get('href')
                if link:
                    links.append(link)


    except requests.RequestException as e:
        print(f"Error during the search request: {e}")

    return links

def fetch_web_content(url: str) -> str:
    """Fetch and clean the main content of a webpage."""
    try:
        headers = {"User-Agent": get_useragent()}
        response = requests.get(url, headers=headers, timeout=5)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")
        paragraphs = soup.find_all("p")
        return " ".join([para.get_text() for para in paragraphs])
    except requests.RequestException as e:
        print(f"Failed to fetch {url}: {e}")
        return ""

def get_shingles(text: str, k: int = 5) -> set:
    """Generate k-shingles (sets of k consecutive words) for a given text."""
    words = text.split()
    shingles = set()
    for i in range(len(words) - k + 1):
        shingle = " ".join(words[i:i + k])
        shingle_hash = hashlib.md5(shingle.encode("utf-8")).hexdigest()
        shingles.add(shingle_hash)
    return shingles

def similarity1(set1, set2):
    # Check if either set is empty to prevent division by zero
    if len(set1) == 0 or len(set2) == 0:
        return 0.0
    intersection = len(set1.intersection(set2))
    return (intersection / len(set1)) * 100

def calculate_jaccard_similarity(shingles1: set, shingles2: set) -> float:
    """Calculate Jaccard similarity between two sets of shingles."""
    intersection = shingles1.intersection(shingles2)
    union = shingles1.union(shingles2)

    # Avoid division by zero if both sets are empty
    if len(union) == 0:
        return 0.0

    return len(intersection) / len(union)

def check_sentence_plagiarism(sentence: str) -> Dict[str, Any]:
    """Check plagiarism for a single sentence by searching and comparing content."""
    result = {"sentence": sentence, "matches": []}
    urls = google_search(sentence)

    for url in urls:
        content = fetch_web_content(url)
        if content:
            original_shingles = get_shingles(sentence)
            content_shingles = get_shingles(content)
            # similarity = calculate_jaccard_similarity(original_shingles, content_shingles)
            similarity = similarity1(original_shingles, content_shingles)
            result["matches"].append({"url": url, "score": similarity})

    # Sort matches by score in descending order and take the highest
    if result["matches"]:
        result["matches"].sort(key=lambda x: x["score"], reverse=True)
        highest_match = result["matches"][0]
        result["highest_match"] = {"url": highest_match["url"], "score": highest_match["score"]}
    else:
        result["highest_match"] = {"url": None, "score": 0.0}

    return result

def plagiarism_checker(text: str) -> List[Dict[str, Any]]:
    """Run plagiarism check on each sentence in the input text."""
    sentences = sent_tokenize(text)
    results = [check_sentence_plagiarism(sentence) for sentence in sentences]
    return get_score(results)

def get_score(data):
    # Merging sentences by URL and score
    result = []
    total_score = 0
    max_score = 0
    max_score_url = ""
    total_sentences = len(data)

    for entry in data:
        url = entry['highest_match']['url']
        score = entry['highest_match']['score']
        sentence = entry['sentence']

        # Check if score is greater than 40 for plagiarism flag
        plagiarism = True if score > 40 else False

        # Add score to total score
        total_score += score

        # Track maximum score and corresponding URL
        if score > max_score:
            max_score = score
            max_score_url = url

        # Check if we should merge with the last entry
        if result and result[-1]['url'] == url and result[-1]['score'] == score:
            result[-1]['sentence'] += " " + sentence  # Append the sentence
        else:
            # Add a new entry with plagiarism flag
            result.append({
                'sentence': sentence,
                'url': url,
                'score': score,
                'plagiarism': plagiarism
            })

    # Calculate average score
    average_score = total_score / total_sentences if total_sentences else 0

    # Create the final structured response
    response = {
        "plagiarism": any(entry['plagiarism'] for entry in result),
        "average_score": average_score,
        "max_score": max_score,
        "max_score_url": max_score_url,
        "data": result
    }

    return response


# input_text = """
# An Operating System can be defined as an interface between user and hardware. It is responsible for the execution of all the processes, Resource Allocation, CPU ..."""
# results_plagiarism= plagiarism_checker(input_text)
# print(json.dumps(results_plagiarism, indent=2))




from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import numpy as np

def detect_ai_generated_text_advanced_v3(
    text,
    model_name="roberta-base-openai-detector",
    threshold=0.5,
    max_length=512
):
    """
    Advanced detection of AI-generated text with detailed metrics, AI flag, and percentage.

    Parameters:
        text (str): Input text to analyze.
        model_name (str): Pre-trained model to use for detection.
        threshold (float): Confidence threshold for categorization (default: 0.5).
        max_length (int): Maximum length for tokenization (default: 512).

    Returns:
        dict: Detailed output with categorization, confidence, metrics, AI flag, and percentage.
    """
    try:
        # Load tokenizer and model
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForSequenceClassification.from_pretrained(model_name)

        # Tokenize and prepare text
        tokens = tokenizer(text, return_tensors="pt", truncation=True, max_length=max_length)
        token_count = len(tokens["input_ids"][0])

        # Inference
        outputs = model(**tokens)
        probabilities = torch.softmax(outputs.logits, dim=1).detach().numpy()[0]
        ai_confidence = probabilities[1]
        human_confidence = probabilities[0]
        category = "AI-generated" if ai_confidence > threshold else "Human-written"

        # AI flag and percentage
        is_ai = ai_confidence > threshold
        ai_percentage = round(ai_confidence * 100, 2)

        # Additional metrics
        char_count = len(text)
        word_count = len(text.split())
        avg_word_length = char_count / word_count if word_count > 0 else 0
        confidence_diff = abs(ai_confidence - human_confidence)

        return {
            "category": category,
            "AI": is_ai,
            "AI_Percentage": ai_percentage,
            "confidence": ai_confidence if is_ai else human_confidence,
            "confidence_difference": confidence_diff,
            "probabilities": {
                "Human-written": human_confidence,
                "AI-generated": ai_confidence
            },
            "metrics": {
                "character_count": char_count,
                "word_count": word_count,
                "average_word_length": round(avg_word_length, 2),
                "token_count": token_count,
                "max_token_length": max_length
            },
            "threshold_used": threshold
        }
    except Exception as e:
        return {"error": str(e)}

# Example usage
#text = "In a world driven by rapid technological advancement, artificial intelligence has emerged as a transformative force. From revolutionizing healthcare with predictive diagnostics to enhancing efficiency in industries through automation, AI continues to redefine the boundaries of what is possible. However, this unprecedented growth also poses ethical dilemmas, emphasizing the need for responsible innovation to ensure these technologies benefit humanity as a whole."
#result_AI = detect_ai_generated_text_advanced_v3(text, threshold=0.7)
#print(result)
#print()

def convert_numpy_types(obj):
    if isinstance(obj, np.generic):  # Check if it’s a numpy scalar
        return obj.item()  # Convert to native Python type
    return obj  # Leave other types unchanged

# load models for first time
load_models()  # Load models once

Loading models...
Models loaded successfully.


In [None]:

import json
question = "What is the purpose of a file system?"
student_answer = "An operating system (OS) is system software that manages computer hardware and software resources, and provides common services for computer programs."
expected_answer = "A file system organizes and controls how data is stored and retrieved on a computer system."
marks = 5  # Full marks for the question

# Get the evaluation data
evaluation_result = GetData(question, student_answer, expected_answer, marks)

result_json = json.dumps(evaluation_result, indent=4, default=convert_numpy_types)
print(result_json)

The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of the model checkpoint at roberta-base-openai-detector were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.

{
    "question": "What is the purpose of a file system?",
    "student_answer": "An operating system (OS) is system software that manages computer hardware and software resources, and provides common services for computer programs.",
    "expected_answer": "A file system organizes and controls how data is stored and retrieved on a computer system.",
    "marks": 5,
    "score": 1.0,
    "score_percentage": 20.0,
    "feedback": "The provided answer does not directly address the question about the primary purpose(s) of an operating systems (operating system). The answer focuses more on explaining what an OS is rather than its main function. It correctly identifies the definition but misses out on the central concept asked for.",
    "similarity": 0.4761287569999695,
    "similarity_percentage": 47.61,
    "final_marks_obtained": 1.8532126708417516,
    "final_marks_percentage": 37.06,
    "student_word_count": 21,
    "student_sentence_count": 1,
    "plagiarism": false,
    "average_s

Bellow is the working code

In [None]:

import pandas as pd
import json
import asyncio

# Load the Excel file
file_path = '/content/drive/MyDrive/sses/cleaned_lsngjlxe_sses.xlsx'
df = pd.read_excel(file_path)

# Define the starting row
start_row = 80  # 81st row in Excel (0-based indexing)

# Define the output file path
output_file_path = '/content/drive/MyDrive/sses/output_results.xlsx'

# Load existing results if available
try:
    existing_df = pd.read_excel(output_file_path)
    print("Existing results loaded.")
except FileNotFoundError:
    # Initialize an empty DataFrame with the necessary columns
    existing_df = pd.DataFrame(columns=df.columns.tolist() + ['evaluation_result_json'])
    print("Output file not found. Starting fresh.")

# Async function to save DataFrame
async def save_to_drive(df, file_path):
    await asyncio.to_thread(df.to_excel, file_path, index=False)
    print(f"Data saved to {file_path}.")

# Main processing function
async def process_rows():
    global existing_df  # Ensure we modify the global variable
    for index, row in df.iloc[start_row:].iterrows():
        question = row['question_text']
        student_answer = row['student_answer']
        full_marks = row['full_marks']
        expected_answer = ""  # As expected_answer is not present, send an empty value

        try:
            # Call the GetData function
            evaluation_result = GetData(question, student_answer, expected_answer, full_marks)
            result_json = json.dumps(evaluation_result, indent=4, default=convert_numpy_types)

            # Add the evaluation result to the current row
            row_data = row.to_dict()
            row_data['evaluation_result_json'] = result_json

            # Convert the row to a DataFrame and append to the existing DataFrame
            new_row_df = pd.DataFrame([row_data])
            existing_df = pd.concat([existing_df, new_row_df], ignore_index=True)

            # Save to Google Drive asynchronously
            await save_to_drive(existing_df, output_file_path)

            # Print success message and JSON response
            print(f"Row {index + 1} processed and saved:\n{result_json}\n")

        except Exception as e:
            print(f"Error processing row {index + 1}: {e}")
            continue

    print("All rows processed and saved.")

# Run the async processing function
await process_rows()

This is temp

In [None]:

import pandas as pd
import json
import asyncio
from time import time

# File paths
input_file_path = '/content/drive/MyDrive/sses/cleaned_lsngjlxe_sses.xlsx'
output_file_path = '/content/drive/MyDrive/sses/output_results.xlsx'

# Parameters
start_row = 80  # Start processing from row 81 (0-based index)
retry_attempts = 3  # Number of retries for GetData
save_interval = 1  # Save after each row

# Load the input file
df = pd.read_excel(input_file_path)

# Load existing results if available
try:
    existing_df = pd.read_excel(output_file_path)
    print("Existing results loaded.")
    last_saved_row = len(existing_df)
except FileNotFoundError:
    existing_df = pd.DataFrame(columns=df.columns.tolist() + ['evaluation_result_json'])
    print("Output file not found. Starting fresh.")
    last_saved_row = 0

# Initialize buffer for new rows
new_rows = []

# Async function to save progress
async def save_progress():
    global last_saved_row
    if new_rows:
        df_to_save = pd.DataFrame(new_rows)
        df_to_save.to_excel(output_file_path, mode='a', header=False, index=False)
        new_rows.clear()
        last_saved_row += len(df_to_save)
        print(f"Last saved row in drive: {last_saved_row}")

# Main processing function
async def process_rows():
    global existing_df, new_rows

    for index, row in df.iloc[start_row:].iterrows():
        start_time = time()
        print(f"Currently processing row: {index + 1}")
        question = row['question_text']
        student_answer = row['student_answer']
        full_marks = row['full_marks']
        expected_answer = ""  # No expected answer

        # Retry mechanism for GetData
        for attempt in range(retry_attempts):
            try:
                # Call GetData
                evaluation_result = GetData(question, student_answer, expected_answer, full_marks)

                # Add the evaluation result to the row
                row_data = row.to_dict()
                row_data['evaluation_result_json'] = json.dumps(evaluation_result, default=str)
                new_rows.append(row_data)

                # Save progress after each row
                if len(new_rows) >= save_interval:
                    await asyncio.to_thread(save_progress)

                print(f"Row {index + 1} processed in {time() - start_time:.2f} seconds.")
                break  # Exit retry loop on success

            except Exception as e:
                print(f"Error processing row {index + 1}, attempt {attempt + 1}: {e}")
                if attempt == retry_attempts - 1:
                    print(f"Skipping row {index + 1} after {retry_attempts} attempts.")
                else:
                    await asyncio.sleep(0.5)  # Short delay before retrying
                continue

    # Final save for remaining rows
    await asyncio.to_thread(save_progress)
    print("All rows processed and saved.")

# Run the async processing function
await process_rows()

The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Existing results loaded.
Currently processing row: 81


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/624 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/501M [00:00<?, ?B/s]

Some weights of the model checkpoint at roberta-base-openai-detector were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Row 81 processed in 50.17 seconds.
Currently processing row: 82


Some weights of the model checkpoint at roberta-base-openai-detector were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
  handle = self._ready.popleft()


Row 82 processed in 38.62 seconds.
Currently processing row: 83


2nd temp

In [None]:

import pandas as pd
import json
import asyncio
from time import time
import numpy as np

# File paths
input_file_path = '/content/drive/MyDrive/sses/cleaned_lsngjlxe_sses.xlsx'
output_file_path = '/content/drive/MyDrive/sses/output_results.xlsx'

# Parameters
start_row = 80  # Start processing from row 81 (0-based index)
retry_attempts = 3  # Number of retries for GetData
save_interval = 1  # Save after each row

# Load the input file
df = pd.read_excel(input_file_path)

# Load existing results if available
try:
    existing_df = pd.read_excel(output_file_path)
    print("Existing results loaded.")
    last_saved_row = len(existing_df)
except FileNotFoundError:
    existing_df = pd.DataFrame(columns=df.columns.tolist() + ['evaluation_result_json'])
    print("Output file not found. Starting fresh.")
    last_saved_row = 0

# Initialize buffer for new rows
new_rows = []

# Function to handle NumPy data types in JSON
def convert_numpy_types(obj):
    if isinstance(obj, (np.integer, int)):
        return int(obj)
    elif isinstance(obj, (np.floating, float)):
        return float(obj)
    elif isinstance(obj, (np.bool_, bool)):
        return bool(obj)
    elif obj is None:
        return None
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")

# Async function to save progress
async def save_progress():
    global last_saved_row
    if new_rows:
        df_to_save = pd.DataFrame(new_rows)
        df_to_save.to_excel(output_file_path, mode='a', header=False, index=False)
        new_rows.clear()
        last_saved_row += len(df_to_save)
        print(f"Last saved row in drive: {last_saved_row}")

# Main processing function
async def process_rows():
    global existing_df, new_rows

    for index, row in df.iloc[start_row:].iterrows():
        start_time = time()
        print(f"\nCurrently processing row: {index + 1}")
        question = row['question_text']
        student_answer = row['student_answer']
        full_marks = row['full_marks']
        expected_answer = ""  # No expected answer

        # Retry mechanism for GetData
        for attempt in range(retry_attempts):
            try:
                # Call GetData
                evaluation_result = GetData(question, student_answer, expected_answer, full_marks)

                # Add the evaluation result to the row
                row_data = row.to_dict()
                row_data['evaluation_result_json'] = json.dumps(evaluation_result, default=convert_numpy_types)
                new_rows.append(row_data)

                # Display processing result
                print(f"Processing result (JSON): {json.dumps(evaluation_result, indent=4, default=convert_numpy_types)}")

                # Save progress after each row
                if len(new_rows) >= save_interval:
                    await asyncio.to_thread(save_progress)

                print(f"Row {index + 1} processed in {time() - start_time:.2f} seconds.")
                break  # Exit retry loop on success

            except Exception as e:
                print(f"Error processing row {index + 1}, attempt {attempt + 1}: {e}")
                if attempt == retry_attempts - 1:
                    print(f"Skipping row {index + 1} after {retry_attempts} attempts.")
                else:
                    await asyncio.sleep(0.5)  # Short delay before retrying
                continue

    # Final save for remaining rows
    await asyncio.to_thread(save_progress)
    print("All rows processed and saved.")

# Run the async processing function
await process_rows()

Existing results loaded.

Currently processing row: 81


Some weights of the model checkpoint at roberta-base-openai-detector were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Processing result (JSON): {
    "question": "Q7. Caesar Cipher Decryption: Decrypt the cipher text \"LQIRUPDWLRQ VHFULWB\", which was encrypted using a Caesar Cipher with a shift of 3. (2 Marks)_x000D_\n",
    "student_answer": "\nInformation security",
    "expected_answer": "",
    "marks": 2.0,
    "score": 1.5,
    "score_percentage": 75.0,
    "feedback": "The answer correctly decrypts the Caesar cipher but could use more details about the encryption process and steps taken to arrive at the solution.",
    "similarity": 0.12042701244354248,
    "similarity_percentage": 12.04,
    "final_marks_obtained": 1.2650003028149577,
    "final_marks_percentage": 63.25,
    "student_word_count": 2,
    "student_sentence_count": 0,
    "plagiarism": false,
    "average_score": 0.0,
    "max_score": 0,
    "max_score_url": "",
    "plagiarism_data": [
        {
            "sentence": "\nInformation security",
            "url": null,
            "score": 0.0,
            "plagiarism": false
 

Some weights of the model checkpoint at roberta-base-openai-detector were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
  handle = None  # Needed to break cycles when an exception occurs.


Processing result (JSON): {
    "question": "Q8. In an RSA cryptosystem, a particular A uses two prime numbers p = 17 and q =11 to generate her\npublic and private keys. If the public key (e) of A is 7. Then the private key (d) of A is? (4 Marks)\n",
    "student_answer": "\nAnswer= 23",
    "expected_answer": "",
    "marks": 4.0,
    "score": 3.0,
    "score_percentage": 75.0,
    "feedback": null,
    "similarity": 0.2596970200538635,
    "similarity_percentage": 25.97,
    "final_marks_obtained": 3.1171655928933584,
    "final_marks_percentage": 77.93,
    "student_word_count": 2,
    "student_sentence_count": 0,
    "plagiarism": false,
    "average_score": 0.0,
    "max_score": 0,
    "max_score_url": "",
    "plagiarism_data": [
        {
            "sentence": "\nAnswer= 23",
            "url": null,
            "score": 0.0,
            "plagiarism": false
        }
    ],
    "AI_category": "Human-written",
    "AI_flag": false,
    "AI_percentage": 11.66,
    "AI_confidence

In [None]:
import pandas as pd
import json
import asyncio
from time import time

# File paths
input_file_path = '/content/drive/MyDrive/sses/cleaned_lsngjlxe_sses.xlsx'
output_file_path = '/content/drive/MyDrive/sses/output_results.xlsx'

# Parameters
start_row = 80  # Start processing from row 81 (0-based index)
retry_attempts = 3  # Number of retries for GetData

# Load the input file
df = pd.read_excel(input_file_path)

# Load existing results if available
try:
    existing_df = pd.read_excel(output_file_path)
    print("Existing results loaded.")
    last_saved_row = len(existing_df)
except FileNotFoundError:
    existing_df = pd.DataFrame(columns=df.columns.tolist() + ['evaluation_result_json'])
    print("Output file not found. Starting fresh.")
    last_saved_row = 0

# Function to save progress after processing a row
async def save_progress(new_row_data):
    global last_saved_row, existing_df, output_file_path

    # Convert the new row data to a DataFrame
    new_row_df = pd.DataFrame([new_row_data])

    # Append the new row to the existing DataFrame
    existing_df = pd.concat([existing_df, new_row_df], ignore_index=True)

    # Save the entire DataFrame back to the file
    existing_df.to_excel(output_file_path, index=False)

    # Update the last saved row
    last_saved_row += 1
    print(f"Row {last_saved_row} saved to drive.")

# Main processing function
async def process_rows():
    global existing_df

    for index, row in df.iloc[start_row:].iterrows():
        start_time = time()
        print(f"Currently processing row: {index + 1}")
        question = row['question_text']
        student_answer = row['student_answer']
        full_marks = row['full_marks']
        expected_answer = ""  # No expected answer

        # Retry mechanism for GetData
        for attempt in range(retry_attempts):
            try:
                # Call GetData
                evaluation_result = GetData(question, student_answer, expected_answer, full_marks)

                # Add the evaluation result to the row
                row_data = row.to_dict()
                row_data['evaluation_result_json'] = json.dumps(evaluation_result, default=str)

                # Save progress after processing the row
                await save_progress(row_data)

                # Print result
                print(f"Row {index + 1} processed and saved in {time() - start_time:.2f} seconds.")
                print(f"Result JSON:\n{json.dumps(evaluation_result, indent=4)}\n")
                break  # Exit retry loop on success

            except Exception as e:
                print(f"Error processing row {index + 1}, attempt {attempt + 1}: {e}")
                if attempt == retry_attempts - 1:
                    print(f"Skipping row {index + 1} after {retry_attempts} attempts.")
                else:
                    await asyncio.sleep(0.5)  # Short delay before retrying
                continue

    print("All rows processed and saved.")

# Run the async processing function
await process_rows()

In [17]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import re
import math
from sentence_transformers import SentenceTransformer, util

# Global variables for models and tokenizers
model_qwen = None
tokenizer_qwen = None
sentence_transformer = None
models_loaded = False  # Flag to ensure models are loaded only once


def load_models():
    """
    Load global models and tokenizers.
    This function should be called only once.
    """
    global model_qwen, tokenizer_qwen, sentence_transformer, models_loaded

    if not models_loaded:  # Load models only if not already loaded
        print("Loading models...")

        # Load Qwen model
        drive_path_qwen = "/content/drive/MyDrive/sses/Qwen2.5-1.5B-Instruct"
        tokenizer_qwen = AutoTokenizer.from_pretrained(drive_path_qwen, local_files_only=True)
        model_qwen = AutoModelForCausalLM.from_pretrained(drive_path_qwen, local_files_only=True).to(torch.device("cpu"))

        # Load SentenceTransformer for semantic similarity (MiniLM model)
        drive_path_minilm = "/content/drive/MyDrive/sses/all-MiniLM-L6-v2"
        sentence_transformer = SentenceTransformer(drive_path_minilm, device="cpu")

        models_loaded = True
        print("Models loaded successfully.")
    else:
        print("Models are already loaded.")


def extract_score_and_feedback(generated_text):
    """
    Extract score and feedback from the generated text using regular expressions.

    :param generated_text: The raw output from the model.
    :return: A dictionary with "score" and "feedback".
    """
    score_pattern = r'"score":\s*([0-9.]+)'  # Matches numeric values for score
    feedback_pattern = r'"feedback":\s*"([^"]+)"'  # Matches the feedback string

    # Find matches
    score_match = re.search(score_pattern, generated_text)
    feedback_match = re.search(feedback_pattern, generated_text)

    # Extract values
    score = float(score_match.group(1)) if score_match else None  # Convert to float if match found
    feedback = feedback_match.group(1) if feedback_match else None  # Extract feedback text if match found

    return {"score": score, "feedback": feedback}


def calculate_semantic_similarity(student_answer, expected_answer):
    """
    Calculate the semantic similarity between the student's answer and the expected answer.

    :param student_answer: The student's answer.
    :param expected_answer: The expected correct answer.
    :return: The semantic similarity score.
    """
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Encode both answers using the SentenceTransformer model
    embeddings = sentence_transformer.encode([student_answer, expected_answer], convert_to_tensor=True)

    # Compute cosine similarity
    similarity_score = util.pytorch_cos_sim(embeddings[0], embeddings[1]).item()
    return similarity_score


def GetData(question, student_answer, expected_answer, marks):
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    question = question or ""
    student_answer = student_answer or ""
    expected_answer = expected_answer or ""
    marks = marks or 0

    generated_text = generate_response(question, student_answer, marks)
    result = extract_score_and_feedback(generated_text)
    score = max(result.get('score', 0), 0)

    similarity = max(calculate_semantic_similarity(student_answer, expected_answer) or 0, 0)
    similarity_percentage = round(similarity * 100, 2)

    log_score = math.log1p(score)
    log_similarity = math.log1p(similarity)
    weighted_score = (log_score * 0.5 + log_similarity * 0.5) * marks

    max_allowed_marks = marks * 0.9
    final_marks_obtained = min(weighted_score, max_allowed_marks)
    final_marks_obtained = max(final_marks_obtained, 0)
    final_marks_obtained = (final_marks_obtained + score) / 2

    return {
        "question": question,
        "student_answer": student_answer,
        "expected_answer": expected_answer,
        "marks": marks,
        "score": score,
        "final_marks_obtained": round(final_marks_obtained, 2),
        "similarity_percentage": similarity_percentage,
    }

In [18]:
# Load models
load_models()

# Define sample inputs for testing
question = "What is a process in an operating system?"
student_answer = "A process is a program that is running on a computer."
expected_answer = "A process is a program that is being executed by the operating system. It consists of the program code, current activity, and resources."
marks = 10

# Test GetData function
result = GetData(question, student_answer, expected_answer, marks)

# Print the result
print(result)

Loading models...
Models loaded successfully.


TypeError: '>' not supported between instances of 'int' and 'NoneType'

In [21]:
# Test the model's response directly
question = "What is a process in operating systems?"
student_answer = "A process is a program in execution."
marks = 5

def generate_response(question, student_answer, marks):
    """
    Generate a response using the Qwen model.

    :param question: The input question.
    :param student_answer: The input student's answer.
    :return: The generated text response.
    """
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Prepare the input text
    input_text = f"""
    Question: {question}
    Answer: {student_answer}
    Evaluate and provide these two fields **score** and **feedback** based on this answer where full marks is {marks} and can be given for the most precise and best answer, and 0 for wrong or irrelevant answers. Respond only with these two fields in JSON format.
    """

    # Tokenize the input
    input_ids = tokenizer_qwen.encode(input_text, return_tensors="pt").to(torch.device("cpu"))

    # Dynamically set the maximum number of new tokens
    max_allowed_tokens = model_qwen.config.max_position_embeddings  # Total token limit of the model
    input_token_count = len(input_ids[0])
    max_new_tokens = max(50, max_allowed_tokens - input_token_count)  # Minimum 50 new tokens

    # Generate the response
    output_ids = model_qwen.generate(
        input_ids,
        max_length=input_token_count + max_new_tokens,  # Total length: input + generated tokens
        num_return_sequences=1,
        no_repeat_ngram_size=2,  # Avoid repetition
        early_stopping=True,    # Stop when the output is complete
    )

    # Decode and return the generated output
    return tokenizer_qwen.decode(output_ids[0], skip_special_tokens=True)


# Generate response using the model
generated_text = generate_response(question, student_answer, marks)

# Print the generated response
print(generated_text)




    Question: What is a process in operating systems?
    Answer: A process is a program in execution.
    Evaluate and provide these two fields **score** and **feedback** based on this answer where full marks is 5 and can be given for the most precise and best answer, and 0 for wrong or irrelevant answers. Respond only with these two fields in JSON format.
     ```json
     {"score": 3, "feedback": "The answer is partially correct as it defines a 'process' but does not specify that it's in 'execution'. However, it correctly identifies processes as 'program(s) in operation', which aligns closely with the question."}
     ```
     The above output will update your score and feedback to reflect the evaluation of the provided response.

```json 
{"score" : 2, 
"feedback" :"The definition provided is accurate but lacks specificity regarding the context (e.g., user interface, database operations). It could benefit from clarifying if we are referring to multi-threaded or single-thread proce

In [22]:
def generate_response(question, student_answer, marks):
    """
    Generate a response using the Qwen model.

    :param question: The input question.
    :param student_answer: The input student's answer.
    :return: The generated text response.
    """
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Prepare the input text
    input_text = f"""
    Question: {question}
    Answer: {student_answer}
    Evaluate and provide these two fields **score** and **feedback** based on this answer where full marks is {marks} and can be given for the most precise and best answer, and 0 for wrong or irrelevant answers. Respond only with these two fields in JSON format.
    """

    # Tokenize the input
    input_ids = tokenizer_qwen.encode(input_text, return_tensors="pt").to(torch.device("cpu"))

    # Dynamically set the maximum number of new tokens
    max_allowed_tokens = model_qwen.config.max_position_embeddings  # Total token limit of the model
    input_token_count = len(input_ids[0])
    max_new_tokens = max(50, max_allowed_tokens - input_token_count)  # Minimum 50 new tokens

    # Generate the response (only 1 sequence)
    output_ids = model_qwen.generate(
        input_ids,
        max_length=input_token_count + max_new_tokens,  # Total length: input + generated tokens
        num_return_sequences=1,  # Ensure only 1 response
        no_repeat_ngram_size=2,  # Avoid repetition
        early_stopping=False,    # Remove early stopping warning
    )

    # Decode and return the generated output
    return tokenizer_qwen.decode(output_ids[0], skip_special_tokens=True)

In [34]:

question = "What is a process in operating systems?"
student_answer = """
A process is a program that is in execution. A process can be defined as a sequence of instructions that are executed by the CPU.
A program, in its static form, is a collection of instructions, whereas a process refers to the program in action.
Each process has its own memory space and system resources, and it is managed by the operating system. The operating system
allocates resources to the process and manages its execution through process scheduling. Processes are fundamental
to the operation of a computer system, enabling multitasking and allowing multiple programs to run simultaneously.
Each process consists of a program counter, registers, and a stack, which together allow the operating system to keep track
of the state of the process. Processes can be classified as either user-level processes or kernel-level processes,
depending on whether they interact directly with the user or operate in the background of the system.
In multi-threaded applications, a single process can have multiple threads, which are smaller units of execution
that share the same memory space. Processes also communicate with each other via inter-process communication (IPC)
mechanisms, such as message passing or shared memory. As a process executes, it may transition through various states,
including ready, running, waiting, and terminated, depending on its interaction with the system and the availability of resources.
"""
marks = 5

# Generate response using the model
generated_text = generate_response(question, student_answer, marks)

# Print the generated response
print(generated_text)

{"score": 2, "feedback": "The explanation provides accurate definitions but could benefit from more detailed examples to enhance understanding."}


In [32]:


def generate_response(question, student_answer, marks):
    """
    Generate a response using the Qwen model.

    :param question: The input question.
    :param student_answer: The input student's answer.
    :return: The generated text response.
    """
    if not models_loaded:
        raise RuntimeError("Models are not loaded. Call `load_models()` first.")

    # Prepare the input text
    input_text = f"""
    Question: {question}
    Answer: {student_answer}
    Evaluate and provide these two fields **score** and **feedback** based on this answer where full marks is {marks} and can be given for the most precise and best answer, and 0 for wrong or irrelevant answers. Respond only with these two fields in JSON format.
    """

    # Tokenize the input
    input_ids = tokenizer_qwen.encode(input_text, return_tensors="pt").to(torch.device("cpu"))

    # Dynamically set the maximum number of new tokens
    max_allowed_tokens = model_qwen.config.max_position_embeddings  # Total token limit of the model
    input_token_count = len(input_ids[0])
    max_new_tokens = max(50, max_allowed_tokens - input_token_count)  # Minimum 50 new tokens

    # Generate the response (only 1 sequence)
    output_ids = model_qwen.generate(
        input_ids,
        max_length=input_token_count + max_new_tokens,  # Total length: input + generated tokens
        num_return_sequences=1,  # Ensure only 1 response
        no_repeat_ngram_size=2,  # Avoid repetition
        early_stopping=False,    # Remove early stopping warning
    )

    # Decode and return the generated output
    generated_text = tokenizer_qwen.decode(output_ids[0], skip_special_tokens=True)

    # Extract only the JSON part
    try:
        start_idx = generated_text.index("{")
        end_idx = generated_text.rindex("}") + 1
        return generated_text[start_idx:end_idx]
    except ValueError:
        return generated_text  # Fallback if JSON is not found