In [None]:
from huggingface_hub import login,whoami
from transformers import pipeline
import textwrap
from google.colab import userdata
from google.colab import drive
import logging
import torch
import json
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import spacy
import re
import numpy as np
from sklearn.exceptions import NotFittedError



drive.mount('/content/gdrive')
%cd /content/gdrive/MyDrive/Transformers
key = userdata.get('HUGGING_FACE_TOKEN')

Mounted at /content/gdrive
/content/gdrive/MyDrive/Transformers


In [None]:
if key:
    login(key)

    # Verify the login status
    try:
        user_info = whoami()  # Get user information
        print(f"Successfully logged in as {user_info['name']}.")
    except Exception as e:
        print("Login failed. Please check your Hugging Face token.")
        print(f"Error: {e}")
else:
    print("No Hugging Face token provided.")

Successfully logged in as pufff28.


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
class AutoGrader:
    def __init__(self):
        self.nlp = spacy.load('en_core_web_sm')
        self.vectorizer = TfidfVectorizer(
            stop_words='english',
            ngram_range=(1, 3),  # Include phrases up to 3 words
            max_features=5000,
            min_df = 1,
            strip_accents='unicode'

        )
    def _preprocess_response(self, response):
        """Add preprocessing to normalize response format"""
        # Remove extra whitespace and normalize spaces
        response = ' '.join(response.split())

        # Handle common formatting issues
        response = response.replace('  ', ' ')
        response = re.sub(r'\s+([.,!?])', r'\1', response)

        # Remove any markdown formatting that might interfere with grading
        response = re.sub(r'[#*_~`]', '', response)

        # Standardize newlines
        response = response.replace('\r\n', '\n')

        # Remove any URLs that might be present
        response = re.sub(r'http\S+|www.\S+', '', response)

        return response
    def parse_requirements(self, requirements_text):
        """Parse requirements with robust format handling"""
        requirements_text = self._preprocess_response(requirements_text)

        requirements = []
        lines = requirements_text.strip().split('\n')
        current_requirement = None

        for line in lines:
            line = line.strip()
            if not line:
                continue

            # Check for new requirement (starts with number)
            if re.match(r'^\d+\.', line):
                if current_requirement:
                    requirements.append(current_requirement)

                # Extract points
                points_match = re.search(r'\((\d+)\s*points?\)', line)
                points = int(points_match.group(1)) if points_match else 30

                # Clean the content
                content = re.sub(r'\((\d+)\s*points?\)', '', line)  # Remove points
                content = re.sub(r'^\d+\.\s*', '', content)  # Remove number
                content = content.strip(': ')  # Remove extra characters

                current_requirement = {
                    'content': content,
                    'points': points,
                    'key_concepts': self._extract_key_concepts(content)
                }
            elif current_requirement:
                # Append to current requirement content
                current_requirement['content'] += ' ' + line
                current_requirement['key_concepts'].extend(self._extract_key_concepts(line))

        # Add final requirement
        if current_requirement:
            requirements.append(current_requirement)

        return requirements

    def _extract_key_concepts(self, text):
        """Extract key concepts using improved NLP analysis"""
        doc = self.nlp(text)
        concepts = []

        # Extract noun phrases and named entities
        for chunk in doc.noun_chunks:
            concepts.append(chunk.text.lower())

        for ent in doc.ents:
            concepts.append(ent.text.lower())

        # Extract important verb phrases
        for token in doc:
            if token.pos_ == "VERB" and token.dep_ == "ROOT":
                phrase = self._get_verb_phrase(token)
                if phrase:
                    concepts.append(phrase.lower())

        # Clean and deduplicate concepts
        concepts = [c.strip() for c in concepts if len(c.strip()) > 3]
        return list(set(concepts))

    def _get_verb_phrase(self, verb_token):
        """Extract meaningful verb phrases with their objects"""
        phrase_parts = [verb_token.text]

        for child in verb_token.children:
            if child.dep_ in ["dobj", "pobj", "attr"]:
                phrase_parts.extend([t.text for t in child.subtree])

        return " ".join(phrase_parts)

    def grade_response(self, requirements, student_response):
        """Enhanced grading with more nuanced scoring and feedback"""
        total_points = 0
        max_points = 0
        feedback = []

        student_doc = self.nlp(student_response)
        student_concepts = set(self._extract_key_concepts(student_response))

        # Calculate global coherence
        global_relevance = self._calculate_similarity(
            ' '.join(req['content'] for req in requirements),
            student_response
        )

        for req in requirements:
            max_points += req['points']

            # Calculate scores with adjusted weights
            concept_coverage = self._calculate_concept_coverage(req['key_concepts'], student_concepts)
            content_relevance = self._calculate_similarity(req['content'], student_response)
            depth_score = self._analyze_response_depth(student_response, req['content'])

            # Adjust weights based on requirement importance
            weights = {
                'concept': 0.35,
                'relevance': 0.25,
                'depth': 0.25,
                'coherence': 0.15
            }

            # Calculate weighted score
            score = (
                concept_coverage * weights['concept'] +
                content_relevance * weights['relevance'] +
                depth_score * weights['depth'] +
                global_relevance * weights['coherence']
            )

            # Apply minimum score threshold
            score = max(0.3, score)  # Ensure minimum 20% score if response attempts the requirement

            points_earned = round(score * req['points'])
            total_points += points_earned

            # Generate feedback
            feedback.append({
                'requirement': req['content'],
                'points_earned': points_earned,
                'max_points': req['points'],
                'status': "✓" if points_earned >= 0.7 * req['points'] else "×",
                'missing_concepts': list(set(req['key_concepts']) - student_concepts),
                'feedback': self._generate_requirement_feedback(
                    score, concept_coverage, content_relevance, depth_score
                )
            })

        return {
            'total_points': total_points,
            'max_points': max_points,
            'percentage': (total_points / max_points * 100),
            'feedback': feedback,
            'summary': {
                'strengths': [f for f in feedback if 'positive' in f],
                'improvements_needed': [f for f in feedback if 'issue' in f],
                'suggestions': [f['suggestion'] for f in feedback if 'suggestion' in f]
            },
            'rubric_breakdown': feedback
        }

    def _analyze_response_depth(self, response, requirement):
        """Enhanced depth analysis considering multiple factors"""
        doc = self.nlp(response)

        # Analyze relevant sentences with improved relevance threshold
        relevant_sents = [sent for sent in doc.sents
                         if self._calculate_similarity(sent.text, requirement) > 0.2]

        if not relevant_sents:
            return 0.0

        # Calculate multiple metrics
        avg_sent_length = np.mean([len(sent) for sent in relevant_sents])
        num_entities = len([ent for sent in relevant_sents for ent in sent.ents])

        # Analysis of sentence complexity
        complexity_scores = []
        for sent in relevant_sents:
            # Count subordinate clauses
            num_subordinate = len([token for token in sent
                                 if token.dep_ in ['advcl', 'ccomp', 'xcomp']])
            # Count logical connectors
            num_connectors = len([token for token in sent
                                if token.dep_ in ['cc', 'mark']])

            complexity = (1 + num_subordinate + 0.5 * num_connectors) / len(sent)
            complexity_scores.append(complexity)

        avg_complexity = np.mean(complexity_scores) if complexity_scores else 0

        # Combine metrics with weights
        depth_score = (
            0.25 * min(1.0, len(relevant_sents) / 8) +    # Number of relevant sentences
            0.20 * min(1.0, avg_sent_length / 20) +       # Average sentence length
            0.20 * min(1.0, num_entities / 8) +           # Named entity usage
            0.20 * min(1.0, avg_complexity) +             # Sentence complexity
            0.15 * paragraph_score                        # Paragraph structure
        )

        return depth_score

    def _generate_requirement_feedback(self, overall_score, concept_score, relevance_score, depth_score):
        """Generate more specific and actionable feedback"""
        feedback = []

        # Concept coverage feedback
        if concept_score < 0.7:
          missing = "Many" if concept_score < 0.4 else "Some"
          feedback.append({
              'area': 'Key Concepts',
              'issue': f"{missing} key concepts could be better addressed",
              'suggestion': "Try to explicitly discuss concepts like [specific concepts]. Consider using course terminology more directly."
          })

        # Relevance feedback
        if relevance_score < 0.7:
            if relevance_score < 0.4:
                feedback.append("Response could be more focused on the specific requirement")
            else:
                feedback.append("Could improve alignment with the requirement")

        # Depth feedback
        if depth_score < 0.7:
            if depth_score < 0.4:
              feedback.append({
                  'area': 'Analysis Depth',
                  'issue': "Response lacks detailed analysis",
                  'suggestion': "Expand your response with specific examples and more detailed explanations of key points. Add supporting evidence for your claims."
              })
            else:
                feedback.append("Could expand on some points with more detail")
        strengths = []
        if concept_score >= 0.7:
            strengths.append("Good use of key concepts")
        if relevance_score >= 0.7:
            strengths.append("Strong relevance to the topic")

        if strengths:
            feedback.append({
                'area': 'Strengths',
                'positive': ', '.join(strengths)
            })
        # Add positive feedback for good scores
        if all(score >= 0.7 for score in [concept_score, relevance_score, depth_score]):
            feedback.append("Strong, well-developed response that addresses the requirement effectively")
        elif any(score >= 0.8 for score in [concept_score, relevance_score, depth_score]):
            feedback.append("Shows good understanding in some areas")

        return feedback

    def _calculate_similarity(self, text1, text2):
        """Calculate semantic similarity with improved error handling"""
        try:
            # Ensure non-empty input
            if not text1.strip() or not text2.strip():
                return 0.0

            # Generate n-gram features with error handling
            texts = [text1.lower(), text2.lower()]

            try:
                tfidf_matrix = self.vectorizer.fit_transform(texts)
                if tfidf_matrix.shape[1] == 0:  # Check if vocabulary is empty
                    return self._fallback_similarity(text1, text2)
                base_similarity = float(cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0])
            except (ValueError, NotFittedError) as e:
                logging.warning(f"TF-IDF calculation failed: {e}")
                return self._fallback_similarity(text1, text2)

            # Add weight for key phrase matches
            doc1 = self.nlp(text1.lower())
            doc2 = self.nlp(text2.lower())

            # Extract key phrases (noun chunks and named entities)
            phrases1 = set([chunk.text for chunk in doc1.noun_chunks] + [ent.text for ent in doc1.ents])
            phrases2 = set([chunk.text for chunk in doc2.noun_chunks] + [ent.text for ent in doc2.ents])

            # Calculate phrase overlap with error handling
            phrase_overlap = len(phrases1.intersection(phrases2)) / max(len(phrases1), 1) if phrases1 else 0

            # Weighted combination
            return 0.7 * base_similarity + 0.3 * phrase_overlap

        except Exception as e:
            logging.error(f"Error in similarity calculation: {e}")
            return self._fallback_similarity(text1, text2)

    def _fallback_similarity(self, text1, text2):
        """Simple fallback similarity measure"""
        # Convert to sets of words
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())

        # Calculate Jaccard similarity
        if not words1 or not words2:
            return 0.0

        intersection = len(words1.intersection(words2))
        union = len(words1.union(words2))

        return intersection / union if union > 0 else 0.0

    def _calculate_concept_coverage(self, required_concepts, student_concepts):
        """Calculate concept coverage with weighted importance"""
        if not required_concepts:
            return 1.0

        matches = len(set(required_concepts) & student_concepts)
        return matches / len(required_concepts)

In [None]:
def clean_llm_response_question(response):
    """
    Extract the discussion question from the LLM response.

    Args:
        response (str): Raw response from the LLM

    Returns:
        str: Cleaned discussion question
    """
    try:
        # Parse the response as a dictionary if it's a string
        if isinstance(response, str):
            response = eval(response)

        # Extract content from the assistant's message
        content = response['content']

        # Find the question after "Discussion Question:" using regex
        match = re.search(r'\*\*Discussion Question:\*\* (.*?)(?=\n|$)', content)
        if match:
            return match.group(1).strip()

        # Fallback: return everything after "Discussion Question:"
        if "Discussion Question:" in content:
            return content.split("Discussion Question:")[-1].strip()

        return content.strip()

    except Exception as e:
        print(f"Error cleaning response: {e}")
        return response

In [None]:
def clean_llm_response_requirements(response):
    """
    Extract and clean requirements from the LLM response, ensuring proper format for grading.
    """
    try:
        # Find the assistant's content with the requirements
        if isinstance(response, list):
            for item in response:
                if isinstance(item, dict) and item.get('role') == 'assistant':
                    content = item.get('content', '')
                    if '1.' in content and '2.' in content and '3.' in content:
                        # Extract the numbered requirements
                        lines = content.split('\n')
                        requirements = []
                        current_req = []

                        for line in lines:
                            if re.match(r'^\d+\.', line.strip()):
                                if current_req:
                                    requirements.append(' '.join(current_req))
                                    current_req = []
                                current_req.append(line.strip())
                            elif line.strip() and current_req:
                                current_req.append(line.strip())

                        if current_req:
                            requirements.append(' '.join(current_req))

                        return '\n'.join(requirements)

        # Fallback: try to extract any numbered list format
        if isinstance(response, str):
            content = response
        else:
            content = str(response)

        if '1.' in content and '2.' in content and '3.' in content:
            # Extract numbered requirements
            pattern = r'\d+\.\s+[^\d].*?(?=\d+\.|$)'
            requirements = re.findall(pattern, content, re.DOTALL)
            if requirements:
                return '\n'.join(req.strip() for req in requirements)

        return content.strip()

    except Exception as e:
        print(f"Error cleaning requirements: {e}")
        return str(response)

In [None]:
def clean_llm_response_feedback(response):
    """
    Extract and clean the information from the LLM response containing
    'Missing Concepts:' and 'Suggestions'.

    Args:
        response (str): Raw response from the LLM

    Returns:
        str: Cleaned response
    """
    try:
        # Find the assistant's response
        assistant_response = next((entry['content'] for entry in response if entry['role'] == 'assistant'), "")

        # Initialize cleaned content
        cleaned_content = {}

        # Extract "Missing Concepts" section
        if "Missing Concepts:" in assistant_response:
            missing_concepts = assistant_response.split("Missing Concepts:")[-1]
            missing_concepts = missing_concepts.split("Suggestions:")[0].strip()
            cleaned_content['Missing Concepts'] = missing_concepts

        # Extract "Suggestions" section
        if "Suggestions:" in assistant_response:
            suggestions = assistant_response.split("Suggestions:")[-1].strip()
            cleaned_content['Suggestions'] = suggestions

        return cleaned_content

    except Exception as e:
        print(f"Error cleaning response: {e}")
        return {}


In [None]:
##Question Generation

def create_questions(pipe, text):
    """Generate discussion question from text with error handling"""
    try:
        messages = [
            {"role": "system", "content": "You are a teaching assistant tasked with creating discussion questions."},
            {"role": "user", "content": f"Create one discussion question from this text: {text}. Denote the discussion question by 'Discussion Question:'"}
        ]
        outputs = pipe(messages, max_new_tokens=1024)

        if not outputs or not outputs[0]["generated_text"]:
            raise ValueError("No question generated")

        question = outputs[0]["generated_text"][-1]
        cleaned_q = clean_llm_response_question(question)

        if not cleaned_q:
            raise ValueError("Question cleaning resulted in empty content")

        return cleaned_q

    except Exception as e:
        logging.error(f"Error generating question: {e}")
        raise

In [None]:
# def create_requirements(text, cleaned_question):
#   messages = [
#       {"role": "system", "content": "You are a teaching assistant tasked with creating grading requirements for students responses to a discussion question"},
#       {"role": "user", "content": f"Create requirements for this discussion question given the text: \n\n{text}\n\n{cleaned_question}. \n\nAssign a point value Denote the requirements by 'Requirements:'"},
#   ]
#   outputs = pipe(
#       messages,
#       max_new_tokens=1024,
#   )

#   requirements = outputs[0]["generated_text"][-1]
#   cleaned_r = clean_llm_response_requirements(requirements)

#   return cleaned_r

In [None]:
def create_requirements(pipe, text, cleaned_question):
    """Generate requirements with proper error handling"""
    try:
        messages = [
            {"role": "system", "content": """You are a teaching assistant creating grading requirements.
            Create 3-4 clear requirements that focus on the main concepts and content rather than technical writing elements.
            Each requirement should:
            1. Focus on understanding and analysis of the core content
            2. Be clearly measurable
            3. Have a specific point value
            4. Total exactly 100 points

            Format as:
            1. [Main concept requirement] (X points): [Brief description of what's expected]
            'X points' must total exactly 100 points.
            Avoid:
            - Repetitive requirements
            - Overly technical writing criteria
            - More than 4 requirements
            - Breaking down points into sub-categories"""},
            {"role": "user", "content": f"Create specific, measurable requirements for grading responses to:\n\n{cleaned_question}\n\nBased on:\n\n{text}"}
        ]

        outputs = pipe(messages, max_new_tokens=1024)
        if not outputs or not outputs[0]["generated_text"]:
            raise ValueError("No requirements generated")

        raw_requirements = outputs[0]["generated_text"]
        cleaned_requirements = clean_llm_response_requirements(raw_requirements)

        if not cleaned_requirements:
            raise ValueError("Requirements cleaning resulted in empty content")

        return cleaned_requirements

    except Exception as e:
        logging.error(f"Error generating requirements: {e}")
        raise


In [None]:


class AnswerAssessment:
    def __init__(self, pipe):
        self.pipe = pipe
        self.grader = AutoGrader()
    def create_prompt(self, question, correct_concepts, student_answer):
      """Create a more structured prompt for LLM feedback"""
      return [
          {"role": "system", "content": """You are an educational assistant providing specific, constructive feedback.
          Analyze the student's response against the grading requirements and provide detailed feedback.

          Focus on:
          1. Content coverage and accuracy
          2. Key concept usage
          3. Depth of analysis
          4. Connection to requirements

          Format your response exactly as:

          Missing Concepts:
          - [List specific key concepts that are missing or need more development]

          Areas of Strength:
          - [List what the student did well]

          Suggestions for Improvement:
          - [2-3 specific, actionable suggestions]
          """},
          {"role": "user", "content": f"""
          Question: {question}

          Grading Requirements:
          {correct_concepts}

          Student Response:
          {student_answer}
          """}
      ]

    def assess_answer(self, question, correct_concepts, student_answer):
        """Enhanced assessment with better error handling"""
        try:
            # Get LLM feedback
            prompt = self.create_prompt(question, correct_concepts, student_answer)
            llm_response = self.pipe(prompt, max_new_tokens=1024)

            if not llm_response or not llm_response[0].get("generated_text"):
                raise ValueError("No feedback generated by LLM")

            # Parse LLM feedback
            llm_feedback = self._parse_llm_feedback(llm_response[0]["generated_text"])

            # Get automated grading
            parsed_reqs = self.grader.parse_requirements(correct_concepts)
            grade_result = self.grader.grade_response(parsed_reqs, student_answer)

            # Calculate composite score
            composite_score = self._calculate_composite_score(grade_result, llm_feedback)

            # Create complete feedback dictionary
            feedback = {
                'llm_feedback': llm_feedback,
                'automated_grading': grade_result,
                'composite_score': composite_score
            }

            return feedback

        except Exception as e:
            logging.error(f"Error in answer assessment: {e}")
            # Return a valid feedback structure even on error
            return {
                'error': str(e),
                'llm_feedback': {'missing_concepts': [], 'areas_of_strength': [], 'suggestions': []},
                'automated_grading': {'total_points': 0, 'max_points': 100, 'percentage': 0},
                'composite_score': 0
            }
    def _parse_llm_feedback(self, feedback_text):
        """Improved LLM feedback parsing with better error handling"""
        feedback_dict = {
            'missing_concepts': [],
            'areas_of_strength': [],
            'suggestions': []
        }

        # Handle case where feedback_text is a list
        if isinstance(feedback_text, list):
            # Extract the content from the last message if it exists
            if feedback_text and isinstance(feedback_text[-1], dict):
                feedback_text = feedback_text[-1].get('content', '')
            else:
                return feedback_dict

        # Convert to string if not already
        feedback_text = str(feedback_text)

        current_section = None

        for line in feedback_text.split('\n'):
            line = line.strip()

            if 'Missing Concepts:' in line:
                current_section = 'missing_concepts'
            elif 'Areas of Strength:' in line:
                current_section = 'areas_of_strength'
            elif 'Suggestions for Improvement:' in line:
                current_section = 'suggestions'
            elif line.startswith('- ') and current_section:
                feedback_dict[current_section].append(line[2:])

        return feedback_dict

    def _calculate_composite_score(self, grade_result, llm_feedback):
        """Calculate a weighted composite score with improved error handling"""
        try:
            automated_score = float(grade_result.get('percentage', 0))

            # Count missing concepts and strengths with safe access
            missing_concepts = llm_feedback.get('missing_concepts', [])
            strengths = llm_feedback.get('areas_of_strength', [])

            concept_penalty = len(missing_concepts) * -2  # -2 points per missing concept
            strength_bonus = len(strengths) * 2   # +2 points per strength

            composite_score = automated_score + concept_penalty + strength_bonus
            return max(0, min(100, composite_score))
        except Exception as e:
            logging.error(f"Error calculating composite score: {e}")
            return automated_score  # Fall back to automated score on error


In [None]:
class ImprovedAutoGrader(AutoGrader):
    def __init__(self):
        super().__init__()
        self.similarity_threshold = 0.6
        self.concept_match_threshold = 0.25

    def _calculate_concept_coverage(self, required_concepts, student_concepts):
        """Improved concept coverage calculation with fuzzy matching"""
        if not required_concepts:
            return 1.0

        matches = 0
        for req_concept in required_concepts:
            # Check for exact matches
            if req_concept in student_concepts:
                matches += 1
                continue

            best_match = max(
                (self._calculate_similarity(req_concept, student_concept)
                 for student_concept in student_concepts),
                default=0
            )

            if best_match > self.concept_match_threshold:
                matches += best_match

        return matches / len(required_concepts)

    def _analyze_response_depth(self, response, requirement):
        """Enhanced depth analysis with better metrics"""
        doc = self.nlp(response)

        # Calculate metrics
        num_sentences = len(list(doc.sents))
        avg_sent_length = len(doc) / num_sentences if num_sentences > 0 else 0

        # Analyze sentence complexity
        complex_sentences = sum(1 for sent in doc.sents if len(list(sent.root.children)) > 3)

        # Count supporting evidence
        evidence_markers = ['because', 'for example', 'such as', 'therefore', 'thus', 'consequently']
        evidence_count = sum(1 for token in doc if token.text.lower() in evidence_markers)

        # Normalized scores
        sent_score = min(1.0, num_sentences / 10)
        complexity_score = min(1.0, complex_sentences / num_sentences) if num_sentences > 0 else 0
        evidence_score = min(1.0, evidence_count / 5)

        # Weighted combination
        depth_score = (
            0.4 * sent_score +
            0.3 * complexity_score +
            0.3 * evidence_score
        )

        return depth_score

    def _generate_requirement_feedback(self, overall_score, concept_score, relevance_score, depth_score):
        """More detailed and actionable feedback generation"""
        feedback = []

        # Concept coverage feedback
        if concept_score < self.concept_match_threshold:
            severity = "significant" if concept_score < 0.4 else "some"
            feedback.append({
                'area': 'Key Concepts',
                'issue': f"Shows {severity} gaps in key concept coverage",
                'suggestion': "Focus on incorporating more specific terminology and concepts from the course material"
            })

        # Depth analysis feedback
        if depth_score < 0.7:
            if depth_score < 0.4:
                feedback.append({
                    'area': 'Analysis Depth',
                    'issue': "Response lacks sufficient depth and detail",
                    'suggestion': "Expand your analysis with specific examples and support your claims with evidence"
                })
            else:
                feedback.append({
                    'area': 'Analysis Depth',
                    'issue': "Some points could be developed further",
                    'suggestion': "Consider adding more supporting details and connecting your ideas more explicitly"
                })

        # Positive feedback
        strengths = []
        if concept_score >= 0.8:
            strengths.append("Excellent grasp of key concepts")
        elif concept_score >= 0.7:
            strengths.append("Good understanding of main concepts")

        if depth_score >= 0.8:
            strengths.append("Strong analytical depth")
        elif depth_score >= 0.7:
            strengths.append("Good level of detail and analysis")

        if strengths:
            feedback.append({
                'area': 'Strengths',
                'positive': ', '.join(strengths)
            })

        return feedback

In [None]:
def display_feedback(feedback_data):
    """Enhanced feedback display with better error handling"""
    try:
        print("\n=== Feedback Summary ===")
        print(f"Overall Score: {feedback_data.get('composite_score', 0):.1f}%")

        automated_grading = feedback_data.get('automated_grading', {})
        print(f"Automated Score: {automated_grading.get('percentage', 0):.1f}%")
        print(f"Total Points: {automated_grading.get('total_points', 0)}/{automated_grading.get('max_points', 100)}")

        print("\n=== LLM Feedback ===")
        llm_feedback = feedback_data.get('llm_feedback', {})

        if llm_feedback.get('missing_concepts'):
            print("\nMissing Concepts:")
            for concept in llm_feedback['missing_concepts']:
                print(f"  - {concept}")

        if llm_feedback.get('areas_of_strength'):
            print("\nAreas of Strength:")
            for strength in llm_feedback['areas_of_strength']:
                print(f"  - {strength}")

        if llm_feedback.get('suggestions'):
            print("\nSuggestions for Improvement:")
            for suggestion in llm_feedback['suggestions']:
                print(f"  - {suggestion}")

        print("\n=== Detailed Requirement Feedback ===")
        for item in automated_grading.get('feedback', []):
            print(f"\n{item.get('status', '?')} Requirement: {item.get('requirement', 'N/A')}")
            print(f"Score: {item.get('points_earned', 0)}/{item.get('max_points', 0)} points")

            if item.get('feedback'):
                print("\nFeedback:")
                for feedback_item in item['feedback']:
                    if isinstance(feedback_item, dict):
                        for key, value in feedback_item.items():
                            print(f"  {key}: {value}")
                    else:
                        print(f"  - {feedback_item}")

    except Exception as e:
        logging.error(f"Error displaying feedback: {e}")
        print("\nError displaying feedback. Please check the log for details.")

In [None]:
def main():
    try:
        # Initialize components
        model_id = "meta-llama/Llama-3.2-1B-Instruct"
        pipe = pipeline(
            "text-generation",
            model=model_id,
            torch_dtype=torch.bfloat16,
            device_map="auto",
        )

        answer_assessment = AnswerAssessment(pipe)
        answer_assessment.grader = ImprovedAutoGrader()

        # Get input text

        input_file = '/content/gdrive/MyDrive/Transformers/test_files/sample_lecture.txt'
        with open(input_file, 'r', encoding='utf-8') as file:
          source_text = file.read()
        print(len(source_text))
        if not source_text:
            raise ValueError("Empty source text provided")

        # Generate question
        question = create_questions(pipe, source_text)
        print("\nGenerated Question:")
        print(question)

        # Generate requirements
        requirements = create_requirements(pipe, source_text, question)
        print("\nGrading Requirements:")
        print(requirements)

        # Get student response
        print("\nProvide Response:")
        student_answer = input().strip()

        if not student_answer:
            raise ValueError("Empty student answer provided")

        # Process feedback
        feedback = answer_assessment.assess_answer(
            question=question,
            correct_concepts=requirements,
            student_answer=student_answer
        )

        # Display results
        display_feedback(feedback)

    except Exception as e:
        print(f"Error in execution: {str(e)}")
        logging.error(f"Detailed error: {e}", exc_info=True)

In [None]:
if __name__ == "__main__":
    main()

config.json:   0%|          | 0.00/877 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.47G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/189 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/54.5k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.09M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/296 [00:00<?, ?B/s]

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


9066


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.



Generated Question:
"In the context of the Emancipation Proclamation and the Civil Rights Movement, what role do you believe the struggle for freedom and equality has played in shaping the nation's values and institutions, and what steps can be taken to ensure that the pursuit of justice for all is continued and fulfilled?"

Grading Requirements:
1. [Understanding the Emancipation Proclamation and Civil Rights Movement] (20 points) - Analyze the role of the Emancipation Proclamation in shaping the nation's values and institutions. - Explain the significance of the Civil Rights Movement and its impact on the nation's progress towards freedom and equality. - Describe the struggles faced by African Americans during the Civil Rights Movement and the steps taken to achieve justice.
2. [Pursuit of Justice for All] (20 points) - Discuss the importance of the pursuit of justice for all, particularly in the context of the Civil Rights Movement. - Explain the concept of racial justice and the n

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.



=== Feedback Summary ===
Overall Score: 30.0%
Automated Score: 30.0%
Total Points: 6/20

=== LLM Feedback ===

=== Detailed Requirement Feedback ===

× Requirement: [Understanding the Emancipation Proclamation and Civil Rights Movement]  - Analyze the role of the Emancipation Proclamation in shaping the nation's values and institutions. - Explain the significance of the Civil Rights Movement and its impact on the nation's progress towards freedom and equality. - Describe the struggles faced by African Americans during the Civil Rights Movement and the steps taken to achieve justice. 2. [Pursuit of Justice for All]  - Discuss the importance of the pursuit of justice for all, particularly in the context of the Civil Rights Movement. - Explain the concept of racial justice and the need for continued struggle for equality and freedom. - Describe the role of individuals, organizations, and institutions in promoting justice and equality. 3. [The Struggle for Freedom and Equality]  - Analyze

In [None]:
# answer_assessment = AnswerAssessment(pipe=pipe)
# question = create_questions(lecture_content)
# requirements = create_requirements(lecture_content, question)
# student_answer = input(f"{question}\nProvide Response:\n")
# feedback = answer_assessment.assess_answer(question=question, correct_concepts=requirements, student_answer=student_answer)
# print(f"Feedback for Question:\n")
# print("LLM Feedback:", feedback['llm_feedback'])
# print("\nAutomated Grading Results:")
# print(f"Score: {feedback['automated_grading']['total_points']}/{feedback['automated_grading']['max_points']} ({feedback['automated_grading']['percentage']:.1f}%)")
# for item in feedback['automated_grading']['feedback']:
#     print(f"\n{item['status']} {item['requirement']}")
#     print(f"Points: {item['points_earned']}/{item['max_points']}")
#     if item['missing_concepts']:
#         print(f"Missing concepts: {', '.join(item['missing_concepts'])}")

In [None]:
# print(requirements)