## Import Libraries

In [1]:
import os
from datetime import datetime

import os

## Load video and audio recording libraries
import cv2
import pyaudio
import wave
import threading
import subprocess

## Load speech recognition tools
import whisper

## Define the class for handling video recording
from typing import Optional, Dict, Union
from pathlib import Path

## Load Natural Language Processing (NLP) tools
import spacy
from nltk.corpus import wordnet
from nltk.util import bigrams

## Setup logging
import logging

#### Intelligent Interview Recording System
#### This module provides a robust screen recording solution with audio-video capture 
#### and merging capabilities, designed for interview analysis and recording.

### Key Features:
- Simultaneous video and audio recording
- Unique filename generation with timestamps
- Automatic file merging using FFmpeg
- Configurable output directory
- Error handling for camera and audio device access

#### Class for recording functions to create a simplified version that starts and stops recording when specific buttons are pressed####

In [13]:
class ScreenRecorder:
    """
    A comprehensive screen recording class that captures video and audio 
    simultaneously and merges them into a single output file.

    Attributes:
        output_dir (str): Directory to save recorded files
        FORMAT (int): Audio format (16-bit integer)
        CHANNELS (int): Number of audio channels (mono)
        RATE (int): Audio sample rate
        CHUNK (int): Audio buffer size
    """

    def __init__(self, output_dir='recordings'):
        """
        Initialize the ScreenRecorder with configuration settings.

        Args:
            output_dir (str, optional): Directory to save recordings. 
                                        Defaults to 'recordings'.
        """
        # Ensure output directory exists
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

        # Audio recording configuration
        self.FORMAT = pyaudio.paInt16  # 16-bit audio
        self.CHANNELS = 1  # Mono audio
        self.RATE = 44100  # Standard audio sample rate
        self.CHUNK = 1024  # Audio buffer size

        # File path placeholders
        self.video_path = None      # Raw video file path
        self.audio_path = None      # Raw audio file path
        self.final_output_path = None  # Merged video file path

        # Recording state management
        self.recording = False
        self.capture = None
        self.video_writer = None
        self.audio_stream = None
        self.audio_frames = []

    def start_recording(self):
        """
        Initiate screen recording process.

        Responsibilities:
        - Check if recording is already in progress
        - Generate unique filenames
        - Initialize video capture
        - Initialize audio capture
        - Start recording threads

        Raises:
            SystemError: If camera cannot be accessed
        """
        # Prevent multiple recording sessions
        if self.recording:
            print("Recording is already in progress.")
            return
        
        # Generate unique timestamps for filenames
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.video_path = os.path.join(self.output_dir, f"video_{timestamp}.avi")
        self.audio_path = os.path.join(self.output_dir, f"audio_{timestamp}.wav")
        self.final_output_path = os.path.join(self.output_dir, f"final_output_{timestamp}.mp4")

        # Initialize video capture
        self.capture = cv2.VideoCapture(0)
        if not self.capture.isOpened():
            print("Error: Unable to access the camera.")
            return

        # Retrieve video parameters
        frame_width = int(self.capture.get(3))   # Camera width
        frame_height = int(self.capture.get(4))  # Camera height
        frame_rate = int(self.capture.get(5)) or 30  # Frame rate or default

        # Create video writer object
        self.video_writer = cv2.VideoWriter(
            self.video_path,
            cv2.VideoWriter_fourcc(*"XVID"),  # Video codec
            frame_rate,
            (frame_width, frame_height)
        )

        # Initialize audio recording
        self.audio_frames = []
        self.audio_stream = pyaudio.PyAudio().open(
            format=self.FORMAT,
            channels=self.CHANNELS,
            rate=self.RATE,
            input=True,
            frames_per_buffer=self.CHUNK
        )

        # Start recording threads
        self.recording = True
        threading.Thread(target=self._record_video, daemon=True).start()
        threading.Thread(target=self._record_audio, daemon=True).start()

        print(f"Recording started. Video will be saved to: {self.video_path}")
        print(f"Audio will be saved to: {self.audio_path}")

    def _record_video(self):
        """
        Internal method to continuously capture video frames.
        Runs in a separate thread during recording.
        """
        while self.recording:
            ret, frame = self.capture.read()
            if not ret:
                print("Error: Unable to read from camera.")
                break
            self.video_writer.write(frame)

    def _record_audio(self):
        """
        Internal method to continuously capture audio frames.
        Runs in a separate thread during recording.
        """
        while self.recording:
            data = self.audio_stream.read(self.CHUNK)
            self.audio_frames.append(data)

    def stop_recording(self):
        """
        Stop the recording process and merge audio-video files.

        Returns:
            tuple: Paths for video, audio, and final merged file
        """
        # Prevent stopping non-existent recording
        if not self.recording:
            print("No recording in progress.")
            return None, None, None

        # Stop recording
        self.recording = False

        # Release system resources
        self.capture.release()
        self.video_writer.release()

        # Save audio file
        with wave.open(self.audio_path, 'wb') as wf:
            wf.setnchannels(self.CHANNELS)
            wf.setsampwidth(pyaudio.PyAudio().get_sample_size(self.FORMAT))
            wf.setframerate(self.RATE)
            wf.writeframes(b''.join(self.audio_frames))

        # Merge video and audio
        self._merge_video_audio()

        # Output recording details
        print(f"Recording stopped.")
        print(f"Video path: {self.video_path}")
        print(f"Audio path: {self.audio_path}")
        print(f"Final output path: {self.final_output_path}")

        return self.video_path, self.audio_path, self.final_output_path

    def _merge_video_audio(self):
        """
        Merge video and audio files using FFmpeg.

        Handles:
        - Video and audio merging
        - Error checking for FFmpeg availability
        - Codec conversion

        Raises:
            subprocess.CalledProcessError: FFmpeg command execution error
            FileNotFoundError: FFmpeg not installed
        """
        try:
            # FFmpeg command for merging files
            merge_command = [
                'ffmpeg',
                '-i', self.video_path,      # Input video
                '-i', self.audio_path,      # Input audio
                '-c:v', 'copy',             # Copy video codec
                '-c:a', 'aac',              # Convert audio to AAC
                '-shortest',                # Match shorter file duration
                self.final_output_path      # Output merged file
            ]
            
            # Execute FFmpeg merge
            subprocess.run(merge_command, check=True)
            print("Successfully merged video and audio")
        
        except subprocess.CalledProcessError as e:
            print(f"Error merging video and audio: {e}")
        except FileNotFoundError:
            print("FFmpeg not found. Please install FFmpeg to merge video and audio.")


In [14]:
recorder = ScreenRecorder()
input("Press Enter to start recording...")
recorder.start_recording()
    
input("Press Enter to stop recording...")
video_path, audio_path, final_output_path = recorder.stop_recording()

Recording started. Video will be saved to: recordings\video_20241203_234045.avi
Audio will be saved to: recordings\audio_20241203_234045.wav
Successfully merged video and audio
Recording stopped.
Video path: recordings\video_20241203_234045.avi
Audio path: recordings\audio_20241203_234045.wav
Final output path: recordings\final_output_20241203_234045.mp4


In [16]:
print(video_path,"  ", audio_path, "     ",final_output_path)

recordings\video_20241203_234045.avi    recordings\audio_20241203_234045.wav       recordings\final_output_20241203_234045.mp4


#### Apply OpenAI's Whisper model for handling audio/video transcription

In [2]:
class WhisperTranscriber:
    """
    A class to handle audio/video transcription using OpenAI's Whisper model.
    Supports multiple input formats and handles conversion to WAV using FFmpeg.
    """
    
    def __init__(self):
        """
        Initialize the transcriber with the 'base' Whisper model.
        """
        self.setup_logging()
        self.model = whisper.load_model("base")
        self.language = "en"

    def setup_logging(self) -> None:
        """Configure logging for the transcriber."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)

    def convert_to_wav(self, input_file: Union[str, Path], output_dir: Optional[str] = None) -> Optional[str]:
        """
        Convert input audio/video file to WAV format using FFmpeg.
        
        Args:
            input_file (Union[str, Path]): Path to input audio/video file
            output_dir (Optional[str]): Directory for output WAV file
            
        Returns:
            Optional[str]: Path to output WAV file if successful, None otherwise
        """
        try:
            input_path = Path(input_file)
            if not input_path.exists():
                self.logger.error(f"Input file not found: {input_file}")
                return None
            
            # Determine output path
            if output_dir:
                output_path = Path(output_dir) / f"{input_path.stem}.wav"
            else:
                output_path = input_path.with_suffix('.wav')
                
            # Create output directory if it doesn't exist
            output_path.parent.mkdir(parents=True, exist_ok=True)
            
            # FFmpeg command for conversion
            command = [
                'ffmpeg',
                '-i', str(input_path),
                '-ar', '16000',  # Sample rate 16kHz
                '-ac', '1',      # Mono audio
                '-c:a', 'pcm_s16le',  # 16-bit PCM encoding
                str(output_path),
                '-y'  # Overwrite output file if exists
            ]
            
            self.logger.info(f"Converting {input_path} to WAV format")
            result = subprocess.run(
                command,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            if result.returncode != 0:
                self.logger.error(f"FFmpeg conversion failed: {result.stderr}")
                return None
                
            self.logger.info("Conversion successful")
            return str(output_path)
            
        except Exception as e:
            self.logger.error(f"Error during conversion: {str(e)}")
            return None
    
    def transcribe(self, 
                  input_file: Union[str, Path], 
                  output_dir: Optional[str] = None,
                  cleanup: bool = True) -> Dict:
        """
        Transcribe audio/video file using Whisper.
        
        Args:
            input_file (Union[str, Path]): Path to input audio/video file
            output_dir (Optional[str]): Directory for temporary WAV file
            cleanup (bool): Whether to delete temporary WAV file after transcription
            
        Returns:
            Dict: Transcription result containing text and other metadata
        """
        try:
            # Convert to WAV if input is not already WAV
            input_path = Path(input_file)
            if input_path.suffix.lower() != '.wav':
                self.logger.info("Converting input file to WAV format")
                wav_file = self.convert_to_wav(input_file, output_dir)
                if not wav_file:
                    raise RuntimeError("Failed to convert input file to WAV format")
            else:
                wav_file = str(input_path)
            
            # Perform transcription
            self.logger.info("Starting transcription")
            result = self.model.transcribe(wav_file, language=self.language)
            self.logger.info("Transcription completed successfully")
            
            # Cleanup temporary WAV file if requested
            if cleanup and input_path.suffix.lower() != '.wav':
                try:
                    os.remove(wav_file)
                    self.logger.info(f"Cleaned up temporary WAV file: {wav_file}")
                except Exception as e:
                    self.logger.warning(f"Failed to cleanup temporary file: {str(e)}")
            
            return result
            
        except Exception as e:
            self.logger.error(f"Transcription failed: {str(e)}")
            raise

In [3]:
# Initialize transcriber
transcriber = WhisperTranscriber()
    
input_file = r"G:\Intelligent AI-based Interview Analysis\recordings\final_output_20241203_234045.mp4"
try:
    result = transcriber.transcribe(input_file)
        
    # Store the transcribed text in a variable
    transcribed_text = result['text']
        
    # Print the transcribed text
    print(f"Transcription: {transcribed_text}")
        
    # You can now use transcribed_text for further processing
    # For example, save to a file or use in other functions
except Exception as e:
    print(f"Transcription failed: {e}")

  checkpoint = torch.load(fp, map_location=device)
2024-12-04 00:08:05,035 - INFO - Converting input file to WAV format
2024-12-04 00:08:05,036 - INFO - Converting G:\Intelligent AI-based Interview Analysis\recordings\final_output_20241203_234045.mp4 to WAV format
2024-12-04 00:08:05,550 - INFO - Conversion successful
2024-12-04 00:08:05,550 - INFO - Starting transcription
2024-12-04 00:08:07,654 - INFO - Transcription completed successfully
2024-12-04 00:08:07,655 - INFO - Cleaned up temporary WAV file: G:\Intelligent AI-based Interview Analysis\recordings\final_output_20241203_234045.wav


Transcription:  Welcome today we are talking about our graduation project. In my final year I do edu's my teammates in the university. It is my best interview analysis for Candidate Recognition.


## preprocessing transcribed text

In [4]:
import nltk
import string
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import re


nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to C:\Users\Mohamed
[nltk_data]     Walid\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to C:\Users\Mohamed
[nltk_data]     Walid\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to C:\Users\Mohamed
[nltk_data]     Walid\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [6]:
stop_words = set(stopwords.words('english'))
for char in string.punctuation:
    print(char,end= " ")

! " # $ % & ' ( ) * + , - . / : ; < = > ? @ [ \ ] ^ _ ` { | } ~ 

In [8]:
translated_table = str.maketrans('', '', string.punctuation)

#### Using `lemmatization` here in preprocessing as it reduces words to their base , considering the context and the actual meaning. which will be useful for `keyword extraction` as it help maintain the correct semantic meaning, improving the accuracy .


### get_wordnet_pos() =>
#### function used when preprocessing text for tasks like lemmatization, where words need to be reduced to their base forms based on their correct part of speech
#### takes a part-of-speech (POS) tag from the Penn Treebank tag set and maps it to a corresponding POS tag used by the WordNet lexical database,  The function ensures that the POS tag is translated into a format compatible with WordNet's system.

In [9]:
def get_wordnet_pos(tag):
    if tag.startswith('J'):
        return wordnet.ADJ  # Adjective
    elif tag.startswith('V'):
        return wordnet.VERB  # Verb
    elif tag.startswith('N'):
        return wordnet.NOUN  # Noun
    elif tag.startswith('R'):
        return wordnet.ADV  # Adverb
    else:
        return wordnet.NOUN  # Default to Noun

In [10]:
def preprocess_text(text):
    text = text.lower()
    
    text = re.sub(r'\d+', '', text)       # Remove numbers
    text = text.translate(translated_table)

    text_tokens = word_tokenize(text)

    filtered_words=[word for word in text_tokens if word not in stop_words ]
    # lemmatization => transforming words to their base or dictionary form
    lemmatizer=WordNetLemmatizer()

    lemma_words = []
    for word in filtered_words:
        pos_tag = nltk.pos_tag([word])[0][1]  # Get POS tag for each word
        wordnet_pos = get_wordnet_pos(pos_tag)  # Map POS to WordNet POS
        lemma_word = lemmatizer.lemmatize(word, pos=wordnet_pos)  # Lemmatize using WordNet POS
        lemma_words.append(lemma_word)

    processed_text = ' '.join(lemma_words)
    return processed_text

preprocessed_text = preprocess_text(transcribed_text)
print(f"Preprocessed Text: {preprocessed_text}")


Preprocessed Text: welcome today talk graduation project final year edus teammate university best interview analysis candidate recognition


## extract important keyboard from Preprocessed Text

In [11]:
from keybert import KeyBERT
import spacy


# Load spaCy model for POS tagging
nlp = spacy.load("en_core_web_sm")

# Sample preprocessed text, replace this with your actual preprocessed text
text = preprocessed_text
# Initialize KeyBERT model
kw_model = KeyBERT()

# Extract keywords with KeyBERT
raw_keywords = kw_model.extract_keywords(text, keyphrase_ngram_range=(1, 2), stop_words='english', top_n=10)
raw_keywords = [kw[0] for kw in raw_keywords]  # Keep only the keywords without scores

# Filter keywords using spaCy for relevant parts of speech
filtered_keywords = []
for keyword in raw_keywords:
    doc = nlp(keyword)
    # Check if all tokens in the keyword are either NOUN or PROPN
    if all(token.pos_ in {"NOUN", "PROPN"} for token in doc):
        filtered_keywords.append(keyword)

print("Relevant job keywords:", filtered_keywords)

  warn(
2024-12-04 00:10:20,106 - INFO - Use pytorch device_name: cpu
2024-12-04 00:10:20,108 - INFO - Load pretrained SentenceTransformer: all-MiniLM-L6-v2


Relevant job keywords: ['interview analysis', 'analysis candidate', 'candidate recognition', 'interview', 'edus teammate', 'graduation project', 'candidate']


In [12]:
filtered_keywords

['interview analysis',
 'analysis candidate',
 'candidate recognition',
 'interview',
 'edus teammate',
 'graduation project',
 'candidate']

In [15]:
from nltk.util import ngrams


# Function to fetch synonyms for a word using WordNet
def get_synonyms(word):
    """Fetch a set of synonyms for a word using WordNet."""
    synonyms = set()
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            synonyms.add(lemma.name())
    return synonyms

# Function to generate n-grams (1-gram and 2-gram) from the tokens
def generate_ngrams(tokens, n=2):
    """Generate n-grams from the list of tokens."""
    n_grams = ngrams(tokens, n)
    return [' '.join(gram) for gram in n_grams]

# Function to combine each word or phrase (bigram) with its synonyms
def combine_with_synonyms(doc, n=2):
    """Combine each word in the text with its synonyms (including bigrams)."""
    combined_dict = {}
    tokens = [token.text.lower() for token in doc]  # Get list of tokens from doc
    n_grams = generate_ngrams(tokens, n)  # Generate n-grams (1 or 2)
    
    for gram in n_grams:
        synonyms_for_bigram = set()
        words_in_bigram = gram.split()  # Split bigram into individual words
        
        # Get synonyms for each word in the bigram
        for word in words_in_bigram:
            synonyms_for_bigram.update(get_synonyms(word))
        
        combined_dict[gram] = list(synonyms_for_bigram)  # Store the synonyms for the bigram
    
    return combined_dict

In [16]:
# Get the synonyms for each token or bigram in the processed text
result = combine_with_synonyms(doc, n=2)

# Print the result
print(result)

{}


# get the synonyms for each word in the text and the similarity scores

In [None]:
from nltk.corpus import wordnet
from nltk.util import ngrams
from nltk.tokenize import word_tokenize

# Function to fetch synonyms for a word using WordNet
def get_synonyms(word):
    """Fetch a set of synonyms for a word using WordNet."""
    synonyms = set()
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            synonyms.add(lemma.name())
    return synonyms

# Function to calculate similarity between words using Wu-Palmer Similarity
def get_similarity(word1, word2):
    """Calculate the similarity between two words using WordNet's Wu-Palmer similarity."""
    syn1 = wordnet.synsets(word1)
    syn2 = wordnet.synsets(word2)
    
    if syn1 and syn2:
        # Calculate similarity between the first synsets of both words
        return syn1[0].wup_similarity(syn2[0])  # Wu-Palmer similarity (range: 0 to 1)
    return 0  # Return 0 if no similarity found

# Function to generate n-grams (1-gram and 2-gram) from the tokens
def generate_ngrams(tokens, n=2):
    """Generate n-grams from the list of tokens."""
    n_grams = ngrams(tokens, n)
    return [' '.join(gram) for gram in n_grams]

# Function to combine each bigram with its synonyms and similarity
def combine_with_synonyms_and_similarity(doc, n=2, similarity_threshold=0.98):
    """Combine each bigram in the text with its synonyms and calculate similarity, filtering by similarity threshold."""
    combined_dict = {}
    tokens = [token.lower() for token in word_tokenize(doc)]  # Tokenize and lowercase the doc
    n_grams = generate_ngrams(tokens, n)  # Generate n-grams
    
    for gram in n_grams:
        synonyms_with_scores = {}
        words_in_bigram = gram.split()  # Split bigram into individual words
        
        for word in words_in_bigram:
            synonyms = get_synonyms(word)  # Get synonyms for the word
            
            for synonym in synonyms:
                if word != synonym:  # Avoid self-similarity
                    similarity_score = get_similarity(word, synonym)
                    # Only include synonyms with similarity >= 0.98
                    if similarity_score >= similarity_threshold:
                        synonyms_with_scores[synonym] = similarity_score
        
        if synonyms_with_scores:  # Only add to dictionary if there are valid synonyms
            combined_dict[gram] = synonyms_with_scores  # Store the bigram with synonyms and scores
    
    return combined_dict

# Example Usage
doc = "The quick brown fox jumps over the lazy dog"
result = combine_with_synonyms_and_similarity(doc, n=2, similarity_threshold=0.98)
print(result)

