In [40]:
import cv2
import speech_recognition as sr
import language_tool_python
import nltk
import time
import requests
import numpy as np
import os
import pyaudio
import wave
import threading
import time
import subprocess
import azure.cognitiveservices.speech as speechsdk
import json

In [41]:
# Download necessary NLTK data
nltk.download('punkt')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\adrij\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [42]:
# Set the NLTK data path explicitly
nltk.data.path.append('C:\\Users\\adrij\\AppData\\Roaming\\nltk_data')

# Now you can proceed with other imports and function calls
from nltk.tokenize import word_tokenize, sent_tokenize


In [43]:
nltk.download('punkt_tab')


[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\adrij\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [44]:
# Parameters for Audio
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 1024

In [45]:
# Function to record video
def record_video(video_file, duration):
    cap = cv2.VideoCapture(0)  # 0 for the default camera
    fourcc = cv2.VideoWriter_fourcc(*"XVID")
    out = cv2.VideoWriter(video_file, fourcc, 20.0, (640, 480))
    print("Recording video...")

    start_time = time.time()
    while time.time() - start_time < duration:
        ret, frame = cap.read()
        if ret:
            out.write(frame)
            cv2.imshow("Recording Video", frame)

            # Stop video recording on 'q' key
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
        else:
            break

    print("Video recording stopped.")
    cap.release()
    out.release()
    cv2.destroyAllWindows()

In [46]:
# Function to record audio
def record_audio(audio_file, duration):
    audio = pyaudio.PyAudio()
    stream = audio.open(format=FORMAT, channels=CHANNELS,
                        rate=RATE, input=True,
                        frames_per_buffer=CHUNK)
    print("Recording audio...")
    frames = []

    start_time = time.time()
    while time.time() - start_time < duration:
        data = stream.read(CHUNK)
        frames.append(data)

    print("Audio recording stopped.")
    stream.stop_stream()
    stream.close()
    audio.terminate()

    # Save audio to a file
    with wave.open(audio_file, "wb") as wf:
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(audio.get_sample_size(FORMAT))
        wf.setframerate(RATE)
        wf.writeframes(b"".join(frames))

In [47]:
# Function to extract audio from video and perform speech-to-text
def video_to_text(video_file, language='en-US'):
    audio_file = "output.wav"

    # Use absolute paths
    video_file = os.path.abspath(video_file)
    audio_file = os.path.abspath(audio_file)

    # Debug: Print paths
    print(f"Video file: {video_file}")
    print(f"Audio file: {audio_file}")

    # FFmpeg command to extract audio
    command = f"ffmpeg -i \"{video_file}\" -ar 16000 -ac 1 -y \"{audio_file}\""
    os.system(command)

    # Verify if the audio file was created
    if not os.path.exists(audio_file):
        raise FileNotFoundError(f"Audio file {audio_file} was not created. Check the FFmpeg command.")

    # Initialize recognizer and process audio
    recognizer = sr.Recognizer()
    try:
        with sr.AudioFile(audio_file) as source:
            audio = recognizer.record(source)

        # Convert audio to text
        text = recognizer.recognize_google(audio, language=language)
        return text
    except sr.UnknownValueError:
        return "Speech recognition could not understand the audio."
    except sr.RequestError:
        return "Speech recognition service is unavailable."

In [48]:
import language_tool_python

def analyze_grammar(text):
    """
    Uses LanguageTool API to check for grammatical issues in the text.
    Filters out irrelevant issues like capitalization errors.
    """
    tool = language_tool_python.LanguageTool('en-US')
    matches = tool.check(text)
    
    # Define rules to ignore (e.g., capitalization or punctuation errors)
    ignored_rules = {
        "UPPERCASE_SENTENCE_START",  # Ignore sentences not starting with a capital letter
        "PUNCTUATION_PARAGRAPH_END",  # Ignore missing punctuation at paragraph end
    }

    # Filter out matches with ignored rules
    relevant_matches = [match for match in matches if match.ruleId not in ignored_rules]
    
    # Simplistic scoring: reduce score by 10 points for each relevant issue
    grammar_score = max(0, 100 - len(relevant_matches) * 10)
    
    flagged_issues = [(match.ruleId, match.message, match.context) for match in relevant_matches]
    
    print(f"Grammar issues found: {len(relevant_matches)}")
    print("Flagged Issues:")
    for issue in flagged_issues:
        print(f" - {issue[0]}: {issue[1]} (Context: {issue[2]})")
    
    return grammar_score, flagged_issues

# Example Usage
transcribed_text = "this is a example where grammar mistake are detected"
score, issues = analyze_grammar(transcribed_text)

print(f"Grammar Score: {score}")
for issue in issues:
    print(f"Issue: {issue[1]} - Context: {issue[2]}")


Grammar issues found: 1
Flagged Issues:
 - EN_A_VS_AN: Use “an” instead of ‘a’ if the following word starts with a vowel sound, e.g. ‘an article’, ‘an hour’. (Context: this is a example where grammar mistake are detec...)
Grammar Score: 90
Issue: Use “an” instead of ‘a’ if the following word starts with a vowel sound, e.g. ‘an article’, ‘an hour’. - Context: this is a example where grammar mistake are detec...


In [49]:
# Function for speaking rate analysis
def speaking_rate(text, duration):
    """
    Calculates speaking rate (words per minute).
    """
    word_count = len(word_tokenize(text))
    speaking_rate = word_count / (duration / 60)  # words per minute
    return speaking_rate


In [50]:
from pydub import AudioSegment, silence
from nltk.tokenize import word_tokenize

def analyze_pause_filler(audio_file, transcript):
    """
    Counts filler words from transcript and calculates pause time (>2 sec) from audio.
    Handles multi-word fillers correctly.
    """
    # Single word fillers
    single_fillers = {"uh", "um", "umm", "like", "well", "ah"}
    # Multi-word fillers
    multi_fillers = {"you know", "or something"}

    # ---- Count filler words ----
    words = word_tokenize(transcript.lower())
    filler_count = sum(1 for word in words if word in single_fillers)

    # Count multi-word fillers from the transcript text
    transcript_lower = transcript.lower()
    for phrase in multi_fillers:
        filler_count += transcript_lower.count(phrase)

    # ---- Detect pauses ----
    audio = AudioSegment.from_file(audio_file, format="wav")
    silent_chunks = silence.detect_silence(
        audio,
        min_silence_len=2000,  # 2 seconds
        silence_thresh=audio.dBFS - 16
    )
    total_pause_time = sum((end - start) / 1000.0 for start, end in silent_chunks)

    return filler_count, total_pause_time


In [51]:
def calculate_pause_score(total_pause_time):
    """
    Calculates a penalty score for total pause time based on durations.
    """
    max_score = 100
    penalty_per_second = 15
    pause_penalty = min(max_score, total_pause_time * penalty_per_second)
    return max(0, max_score - pause_penalty)


In [52]:
import azure.cognitiveservices.speech as speechsdk

In [53]:
def measure_fluency(audio_file, transcript, wpm_target=140):
    # Get pauses from analyze_pause_filler
    _, total_pause_time = analyze_pause_filler(audio_file, transcript)

    # Speaking rate
    words = word_tokenize(transcript)
    duration_sec = AudioSegment.from_file(audio_file).duration_seconds
    wpm = (len(words) / duration_sec) * 60

    # Pause penalty
    pause_penalty = total_pause_time * 8  # 15 points per sec pause
    # WPM penalty
    wpm_penalty = abs(wpm_target - wpm) * 0.5  # penalize deviation

    fluency_score = max(0, 100 - pause_penalty - wpm_penalty)
    return round(fluency_score, 2)


In [54]:
def measure_completeness(transcript):
    words = [w.lower() for w in word_tokenize(transcript) if w.isalpha()]
    total_words = len(words)
    unique_words = len(set(words))

    if total_words == 0:
        return 0

    diversity_ratio = unique_words / total_words  # 0 to 1
    repetition_penalty = (1 - diversity_ratio) * 50  # up to -50 points

    completeness_score = max(0, 100 - repetition_penalty)
    return round(completeness_score, 2)


In [55]:
import pronouncing
from nltk.tokenize import word_tokenize
from pydub import AudioSegment

def analyze_pronunciation(audio_file, transcript):
    """
    Improved pronunciation analysis without Azure.
    """
    # Tokenize & filter
    words = [w.lower() for w in word_tokenize(transcript) if w.isalpha()]
    total_words = len(words)
    if total_words == 0:
        return 0, 0, 0, 0

    # Accuracy: how many words have dictionary pronunciations
    words_with_pron = [w for w in words if pronouncing.phones_for_word(w)]
    accuracy_score = (len(words_with_pron) / total_words) * 100

    # Fluency: pauses + speaking rate stability
    fluency_score = measure_fluency(audio_file, transcript)

    # Completeness: lexical diversity & repetition penalty
    completeness_score = measure_completeness(transcript)

    # Overall: weighted average
    overall_score = round(
        (accuracy_score * 0.4 + fluency_score * 0.4 + completeness_score * 0.2), 2
    )

    return overall_score, accuracy_score, fluency_score, completeness_score


In [56]:
import cv2
import mediapipe as mp

mp_pose = mp.solutions.pose
mp_face_mesh = mp.solutions.face_mesh
mp_hands = mp.solutions.hands

def analyze_body_language(video_file):
    cap = cv2.VideoCapture(video_file)
    if not cap.isOpened():
        print("Error: Unable to open video file.")
        return 0, 0, 0

    posture_score = 0
    gesture_score = 0
    eye_contact_score = 0
    total_frames = 0

    prev_hand_positions = []

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose, \
         mp_face_mesh.FaceMesh(refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5) as face_mesh, \
         mp_hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5) as hands:

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            total_frames += 1
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Pose detection
            pose_results = pose.process(rgb)
            face_results = face_mesh.process(rgb)
            hands_results = hands.process(rgb)

            # Posture: Check if head is upright
            if pose_results.pose_landmarks:
                landmarks = pose_results.pose_landmarks.landmark
                left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
                right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
                nose = landmarks[mp_pose.PoseLandmark.NOSE.value]
                
                # Upright check: shoulders should be roughly horizontal
                shoulder_diff = abs(left_shoulder.y - right_shoulder.y)
                if shoulder_diff < 0.05:  # Adjust threshold
                    posture_score += 1

            # Eye Contact: Check iris position
            if face_results.multi_face_landmarks:
                for face_landmarks in face_results.multi_face_landmarks:
                    # Iris landmark indices for MediaPipe Face Mesh
                    LEFT_IRIS = [474, 475, 476, 477]
                    right_iris_x = face_landmarks.landmark[LEFT_IRIS[0]].x
                    if 0.4 < right_iris_x < 0.6:  # Eye is looking forward
                        eye_contact_score += 1

            # Gesture: Track hand movement
            if hands_results.multi_hand_landmarks:
                for hand_landmarks in hands_results.multi_hand_landmarks:
                    cx = hand_landmarks.landmark[0].x
                    cy = hand_landmarks.landmark[0].y
                    prev_hand_positions.append((cx, cy))
                    if len(prev_hand_positions) > 2:
                        dist = ((cx - prev_hand_positions[-2][0])**2 + (cy - prev_hand_positions[-2][1])**2)**0.5
                        if dist > 0.02:  # Hand moved significantly
                            gesture_score += 1

    cap.release()

    # Normalize scores
    posture_score = min((posture_score / total_frames) * 100, 100)
    #gesture_score = min((gesture_score / total_frames) * 100, 100)
    eye_contact_score = min((eye_contact_score / total_frames) * 100, 100)

    return posture_score, gesture_score, eye_contact_score


In [57]:
def calculate_overall_score(grammar_score, speaking_rate, filler_count, total_pause_time, 
                            pronunciation_score, accuracy_score, fluency_score):
    """
    Combines all metrics to calculate a final score.
    """
    # Weighting of metrics
    weights = {
        "grammar": 0.3,
        "speaking_rate": 0.1,
        "filler_words": 0.1,
        "pause_patterns": 0.1,
        "pronunciation": 0.2,
        "accuracy": 0.1,
        "fluency": 0.1
    }

    # Normalize scores and calculate weighted sum
    speaking_rate_score = min(100, max(0, 100 - abs(150 - speaking_rate)))  # Target: 150 WPM
    filler_word_penalty = max(0, 100 - filler_count * 10)
    pause_score = calculate_pause_score(total_pause_time)

    overall_score = (
        weights["grammar"] * grammar_score +
        weights["speaking_rate"] * speaking_rate_score +
        weights["filler_words"] * filler_word_penalty +
        weights["pause_patterns"] * pause_score +
        weights["pronunciation"] * pronunciation_score +
        weights["accuracy"] * accuracy_score +
        weights["fluency"] * fluency_score
    )
    return overall_score


In [58]:
def provide_feedback(speaking_rate, fluency_score, pronunciation_score):
    """
    Provides feedback based on speaking rate, fluency, and pronunciation.
    """
    # Speaking Pace Feedback
    if speaking_rate < 80:
        pace_feedback = "You’re speaking too slowly. Consider speeding up to maintain engagement."
    elif 80 <= speaking_rate < 130:
        pace_feedback = "A moderate pace, but slightly faster speech may improve energy."
    elif 130 <= speaking_rate < 170:
        pace_feedback = "Perfect! Your speaking pace is in the ideal range."
    elif 170 <= speaking_rate < 200:
        pace_feedback = "A bit fast. Consider slowing down slightly to enhance clarity."
    else:
        pace_feedback = "Too fast. Try slowing down for better audience comprehension."

    # Voice Clarity Feedback
    if fluency_score < 70:
        fluency_feedback = "Your speech is not fluent. Focus on reducing hesitations and improving rhythm."
    elif 70 <= fluency_score < 90:
        fluency_feedback = "Good fluency, but strive for smoother transitions between words."
    else:
        fluency_feedback = "Excellent fluency! Your speech flows naturally."

    # Pronunciation Feedback
    if pronunciation_score < 70:
        pronunciation_feedback = "Your pronunciation needs improvement. Try practicing individual sounds and word stresses."
    elif 70 <= pronunciation_score < 90:
        pronunciation_feedback = "Your pronunciation is quite good, but there’s room for slight improvement."
    else:
        pronunciation_feedback = "Excellent pronunciation! Keep it up."

    return pace_feedback, fluency_feedback, pronunciation_feedback



In [59]:
def main():
    video_file = "output.avi"
    audio_file = "output.wav"
    duration = 30  # seconds

    # Step 1: Record Video and Audio Simultaneously
    video_thread = threading.Thread(target=record_video, args=(video_file, duration))
    audio_thread = threading.Thread(target=record_audio, args=(audio_file, duration))

    video_thread.start()
    audio_thread.start()

    video_thread.join()
    audio_thread.join()

    # Step 2: Speech-to-Text
    transcript = video_to_text(video_file)

    print("Generated Transcript:")
    print(transcript)
    
 
    grammar_score, grammar_issues = analyze_grammar(transcript)
    # Step 4: Speaking Rate
    speaking_rate_value = speaking_rate(transcript, duration)

    # # Step 5: Pause and Filler Word Analysis
    # filler_word_count, total_pause_time = analyze_pause_filler(audio_file, azure_region, azure_subscription_key)
    filler_word_count, total_pause_time = analyze_pause_filler(audio_file, transcript)

    # Step 6: Pronunciation Analysis
    pronunciation_score, accuracy_score, fluency_score, completeness_score = analyze_pronunciation(
        audio_file, transcript)

    # Step 7: Calculate Overall Score
    overall_score = calculate_overall_score(
        grammar_score, speaking_rate_value, filler_word_count, total_pause_time,
        pronunciation_score, accuracy_score, fluency_score
    )

    # Display Results
    print("\n--- Assessment Results ---")
    print(f"Grammar Score: {grammar_score}")
    print(f"Grammar Issues: {grammar_issues}")
    print(f"Speaking Rate (WPM): {speaking_rate_value}")
    print(f"Filler Word Count: {filler_word_count}")
    print(f"Total Pause Time: {total_pause_time} seconds")
    print(f"Pronunciation Score: {pronunciation_score} (Accuracy: {accuracy_score}, Fluency: {fluency_score}, Completeness: {completeness_score})")
    print(f"Overall Score: {overall_score}")

    pace_feedback, fluency_feedback, pronunciation_feedback = provide_feedback(speaking_rate_value, fluency_score, pronunciation_score)

    print(pace_feedback)
    print(fluency_feedback)
    print(pronunciation_feedback)
    # Body language analysis
    posture_score, gesture_score, eye_contact_score = analyze_body_language(video_file)
    print("\n--- Body Language Scores ---")
    print(f"Posture Score: {posture_score:.2f}/100")
    #print(f"Gesture Score: {gesture_score:.2f}/100")
    print(f"Eye Contact Score: {eye_contact_score:.2f}/100")

    
if __name__ == "__main__":
    main()

Recording audio...
Recording video...
Audio recording stopped.
Video recording stopped.
Video file: C:\Users\adrij\OneDrive\Desktop\Plugin\output.avi
Audio file: C:\Users\adrij\OneDrive\Desktop\Plugin\output.wav
Generated Transcript:
hello my name is address undergraduate student in the department of Electronics and electrical communication engineering and role in his BTech course at Kharagpur I am from Kolkata West Bengal you know sample test
Grammar issues found: 2
Flagged Issues:
 - MORFOLOGIK_RULE_EN_US: Possible spelling mistake found. (Context: ...mmunication engineering and role in his BTech course at Kharagpur I am from Kolkata W...)
 - MORFOLOGIK_RULE_EN_US: Possible spelling mistake found. (Context: ...neering and role in his BTech course at Kharagpur I am from Kolkata West Bengal you know ...)

--- Assessment Results ---
Grammar Score: 80
Grammar Issues: [('MORFOLOGIK_RULE_EN_US', 'Possible spelling mistake found.', '...mmunication engineering and role in his BTech course at