<a href="https://colab.research.google.com/github/Laughing-towel/discord_convert_bot/blob/main/domain_term_extractor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install youtube-transcript-api
!pip install nltk
!pip install pyspellchecker
!pip install matplotlib
!pip install seaborn
!pip install requests beautifulsoup4
!pip install gensim



In [None]:
# API and Network
import requests
from googleapiclient.discovery import build
from urllib.parse import quote_plus

# Data stuff
import json
import os
from threading import Lock
import re
import matplotlib.pyplot as plt
from collections import Counter
from wordcloud import WordCloud

# Text Processing
from bs4 import BeautifulSoup
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from gensim.models import Phrases
from gensim.models.phrases import Phraser
from sklearn.feature_extraction.text import TfidfVectorizer


# Utility and Miscs
from spellchecker import SpellChecker
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound
from concurrent.futures import ThreadPoolExecutor, as_completed

# NLTK Resource Downloads
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [None]:
# YouTube API setup
youtube_api_key = 'AIzaSyAiRRQR--SxNKljnRzxbfOsDcTdnNlwAJs' # Your API key goes here, get it from https://developers.google.com/youtube/v3/getting-started
youtube = build('youtube', 'v3', developerKey=youtube_api_key)

In [None]:
# Define domain-specific terms for each domain, these should be words you do not want removed when removign stopwords
domain_terms = {
    'Destiny 2': set(['guardian', 'warlock', 'hunter', 'titan', 'vanguard', 'crucible', 'strike', 'raid', 'nightfall', 'gambit', 'engram', 'exotic', 'legendary', 'light', 'traveler', 'darkness', 'hive', 'cabal', 'vex', 'fallen', 'taken', 'scorn', 'ghost', 'shader', 'sparrow', 'ship', 'loot']),
    'warframe': set(['tenno', 'warframe', 'grineer', 'corpus', 'infested', 'orokin', 'mods', 'mastery rank', 'syndicates', 'plains of eidolon', 'orb vallis', 'relics', 'void', 'kuva']) # Can't think of any other terms for now, I don't play warframe that much
}

# Domains
domains = ['Destiny 2', 'warframe']

# Base path for all data
base_path = '/content/drive/MyDrive/Colab Notebooks/major_project/' # Change this to where you want to store everything

# The number of videos to fetch for each domain in the list
num_videos_to_fetch = 5


In [None]:
# Function to create domain-specific directories...return paths
def setup_domain_paths(domain):
    domain_path = os.path.join(base_path, domain)
    if not os.path.exists(domain_path):
        os.makedirs(domain_path)

    paths = {
        'base': domain_path,
        'video_ids': os.path.join(domain_path, f'{domain}_video_ids.json'),
        'spell_checked': os.path.join(domain_path, 'spell_checked_transcripts/'),
        'processed': os.path.join(domain_path, 'processed_transcripts/')
    }

    for key, path in paths.items():
        if key != 'video_ids' and not os.path.exists(path):
            os.makedirs(path)

    return paths


Custom stopword list Found from: https://gist.github.com/sebleier/554280

The one used by default is from: https://gist.githubusercontent.com/ZohebAbai/513218c3468130eacff6481f424e4e64/raw/b70776f341a148293ff277afa0d0302c8c38f7e2/gist_stopwords.txt

In [None]:
stopwords_url = "https://gist.githubusercontent.com/ZohebAbai/513218c3468130eacff6481f424e4e64/raw/b70776f341a148293ff277afa0d0302c8c38f7e2/gist_stopwords.txt"
stopwords_file = os.path.join(base_path, "gist_stopwords.txt")

# Download the stopwords file if it does not exist
if not os.path.isfile(stopwords_file):
    response = requests.get(stopwords_url)
    if response.status_code == 200:
        with open(stopwords_file, 'w') as f:
            f.write(response.text)
    else:
        print("Failed to download stopwords")

# Load stopwords from the file
def load_stopwords(file_path):
    try:
        with open(file_path, 'r') as file:
            content = file.read()
            stopwords = set(content.split(","))
        return stopwords
    except IOError:
        print(f"Error opening or reading the stopwords file: {file_path}")
        return set()

# Load the stopwords
stopwords = load_stopwords(stopwords_file)
# print(stopwords)

In [None]:
# Used to save the dictionaries used in the project
def save_dictionaries(domain, validated_words, corrected_words_dict, paths):
    """Saves validated and corrected words dictionaries to files."""
    with open(os.path.join(paths['base'], f'{domain}_validated_words.json'), 'w') as file:
        json.dump(validated_words, file)
    with open(os.path.join(paths['base'], f'{domain}_corrected_words_dict.json'), 'w') as file:
        json.dump(corrected_words_dict, file)

In [None]:
# Used to load the dicitonionaries for a given domain
def load_dictionaries(domain, paths):
    """Loads validated and corrected words dictionaries from files, if they exist."""
    validated_words_path = os.path.join(paths['base'], f'{domain}_validated_words.json')
    corrected_words_path = os.path.join(paths['base'], f'{domain}_corrected_words_dict.json')
    validated_words = {}
    corrected_words_dict = {}

    if os.path.exists(validated_words_path):
        with open(validated_words_path, 'r') as file:
            validated_words = json.load(file)
    if os.path.exists(corrected_words_path):
        with open(corrected_words_path, 'r') as file:
            corrected_words_dict = json.load(file)

    return validated_words, corrected_words_dict

In [None]:
# Global dictionaries for validated and corrected words
validated_words = {}
corrected_words_dict = {}

In [None]:
def reset_dictionaries():
    global validated_words, corrected_words_dict
    validated_words = {}
    corrected_words_dict = {}


In [None]:
def fetch_video_ids(domain, max_results=50):
  """Fetches video IDs from YouTube for a given domain if they aren't already stored."""
  paths = setup_domain_paths(domain)
  video_ids_path = paths['video_ids']

  if os.path.exists(video_ids_path):
      with open(video_ids_path, 'r') as file:
          video_ids = json.load(file)
          if len(video_ids) >= max_results:
              return video_ids[:max_results]

  video_ids = []
  next_page_token = None
  while len(video_ids) < max_results:
      response = youtube.search().list(
          part="id,snippet",
          q=domain,
          type="video",
          videoCaption="closedCaption",  # Ensures videos have captions
          videoDuration="medium",        # Medium duration videos are between 4 minutes and 20 minutes
          maxResults=min(50, max_results - len(video_ids)),
          pageToken=next_page_token,
          relevanceLanguage="en"  # increases relevance to English language content, you can change this

      ).execute()

      video_ids.extend([item['id']['videoId'] for item in response['items']])
      next_page_token = response.get('nextPageToken')
      if not next_page_token:
          break

  with open(video_ids_path, 'w') as file:
      json.dump(video_ids, file)

  return video_ids


In [None]:
def clean_transcript(transcript):
    """Cleans the transcript text, removing unwanted characters and annotations."""
    if isinstance(transcript, list):
        # Extract the text from each segment, skipping the ones with text in square brackets
        transcript_text = ' '.join([segment['text'] for segment in transcript if not segment['text'].startswith('[')])
    else:
        # Assume the transcript is already a string
        transcript_text = transcript

    # Split the transcript text into lines
    lines = transcript_text.split('\n')

    # Filter out non-textual elements like '[Music]', '[Applause]', etc
    cleaned_lines = [line for line in lines if not line.startswith('[')]

    # Join the cleaned lines back into a single string
    cleaned_transcript = ' '.join(cleaned_lines)

    return cleaned_transcript


In [None]:
# Used for correcting words using domain context, uses Google's 'Did you mean' feature when searching
def google_spell_check(word, domain):
    """Uses Google search to find spelling suggestions based on domain context."""
    query = f"{word} {domain}"
    # print(query)
    url = f"https://www.google.com/search?q={quote_plus(query)}"
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36 OPR/83.0.4254.62'}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        soup = BeautifulSoup(response.text, 'html.parser')
        suggestion = soup.find('a', {'class': 'gL9Hy'})
        if suggestion:
            corrected = suggestion.text.replace(domain, '').strip()
            return corrected
    return word

In [None]:
validated_words_lock = Lock()
corrected_words_dict_lock = Lock()
# Spell checks a given text body by iteratively going over each word, uses the 'validated_words' dictionary. Does not need the 'corrected_words' dictionary
def spell_check(text, domain, paths):
    spell = SpellChecker()
    words = text.split()
    corrected_text = []
    changes_made = False

    for word in words:
        word_lower = word.lower()
        # Skip contracted words (e.g., it's, can't)
        if "'" in word_lower:
            corrected_text.append(word)
            continue

        if word_lower not in validated_words:
            changes_made = True
            correction = spell.correction(word_lower)
            validated_words[word_lower] = word_lower  # Add word as validated in its original form

            if correction != word_lower:
                # Only if the word is corrected, we add to corrected_words_dict
                correction = google_spell_check(word_lower, domain)
                corrected_words_dict[word_lower] = correction  # Store the correction
                word_lower = correction  # Update word_lower to use the corrected word

        corrected_text.append(validated_words.get(word_lower, word))

    if changes_made:
        save_dictionaries(domain, validated_words, corrected_words_dict, paths)

    return ' '.join(corrected_text)

In [None]:
def download_and_process_transcripts_concurrently(domain, video_ids):
  """Downloads and processes transcripts for a list of video IDs."""
  paths = setup_domain_paths(domain)
  with ThreadPoolExecutor(max_workers=10) as executor:
      future_to_video_id = {executor.submit(download_and_process_transcript, domain, video_id, paths): video_id for video_id in video_ids}

      for future in as_completed(future_to_video_id):
          video_id = future_to_video_id[future]
          try:
              future.result()  # Capturing the result to handle exceptions if any
          except Exception as e:
              print(f"Error processing video ID {video_id}: {e}")

def download_and_process_transcript(domain, video_id, paths):
    try:
        # Try fetching the transcript with multiple English dialects? accent?
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en', 'en-US', 'en-GB', 'en-CA', 'en-AU'])
        except NoTranscriptFound:
            print(f"No transcript available in standard English variants for video ID {video_id}")
            return  # Early exit if no transcripts are found in any English variant

        cleaned_text = clean_transcript(transcript)
        if not cleaned_text:
            print(f"Cleaned text is empty for video ID {video_id}.")
            return

        spell_checked_text = spell_check(cleaned_text, domain, paths)
        transcript_path = os.path.join(paths['spell_checked'], f'{video_id}_transcript.txt')
        with open(transcript_path, 'w') as file:
            file.write(spell_checked_text)

    except Exception as e:
        print(f"Failed to download or process transcript for video ID {video_id}: {e}")



In [None]:
def merge_transcripts(domain):
    """Merges all transcripts into a single file for a given domain."""
    paths = setup_domain_paths(domain)
    all_transcripts = []
    for filename in os.listdir(paths['spell_checked']):
        with open(os.path.join(paths['spell_checked'], filename), 'r') as file:
            all_transcripts.append(file.read())
    merged_text = ' '.join(all_transcripts)
    with open(os.path.join(paths['processed'], f'{domain}_merged.txt'), 'w') as file:
        file.write(merged_text)
    return merged_text

In [None]:
# Normalises, Tokenises, Lemmatizes and stopword removal
def process_text(text, domain_terms):
    """Processes text by tokenizing on whitespace, normalizing, removing stopwords, and lemmatizing."""
    lemmatizer = WordNetLemmatizer()

    # Tokenization (whitespace) and normalization
    tokens = [word.lower() for word in re.findall(r'\b\w+\b', text)]  # Keep only alphabetic tokens

    # Remove stopwords and lemmatize
    tokens = [lemmatizer.lemmatize(word) for word in tokens if word in domain_terms or word not in stopwords]

    return tokens

Edit the 'min_count' and 'threshold' parameters as needed, read about them here: https://radimrehurek.com/gensim/models/phrases.html

They should ideally scale with how many video transcripts you're processing for the domains

In [None]:
def generate_ngrams(domain, merged_text_path, domain_terms):
    """Generates and saves n-grams from merged transcript text after processing."""
    paths = setup_domain_paths(domain)
    ngram_path = os.path.join(paths['processed'], f'{domain}_ngrams.txt')

    # Load and process the merged text
    with open(merged_text_path, 'r') as file:
        text = file.read()
    tokens = process_text(text, domain_terms)  # Include domain-specific terms in processing

    # Create bigrams and trigrams
    bigram_model = Phrases([tokens], min_count=10, threshold=9) # min_count and threshold can be adjusted
    trigram_model = Phrases(bigram_model[tokens], threshold=10)

    # Transform the tokens using the models
    bigram = Phraser(bigram_model)
    trigram = Phraser(trigram_model)

    # Applying n-gram models to the tokens
    bigrams = ['_'.join(token) for token in bigram[tokens]]
    trigrams = ['_'.join(token) for token in trigram[bigram[tokens]]]

    # Save the n-grams to a file
    with open(ngram_path, 'w') as file:
        file.write(' '.join(bigrams + trigrams))

    return ngram_path



In [None]:
# This function is deprecated but kept because it was used somewhere else before being useless, Ignore it
def prepare_ngram_text_for_tfidf(ngrams_text):
    # Since n-grams are already using underscores, no need to replace them
    formatted_text = ngrams_text  # Keep underscores as they are
    return formatted_text

In [None]:
# Makes n-grams nicer to look at (It turns them back into words)
def format_ngrams(ngrams):
    # Replace multiple underscores with a space
    # Replace single underscores wit blank
    return ngrams.replace('___', ' ').replace('_', '')


In [None]:
def calculate_tfidf_from_ngrams(ngram_path):
    try:
        with open(ngram_path, 'r') as file:
            ngrams_text = file.read()

        prepared_text = prepare_ngram_text_for_tfidf(ngrams_text)
        vectorizer = TfidfVectorizer(ngram_range=(1, 3), token_pattern=r'(?u)\b\w+\b')  # Allows underscores in tokens
        tfidf_matrix = vectorizer.fit_transform([prepared_text])
        feature_names = vectorizer.get_feature_names_out()
        tfidf_scores = {feature_names[i]: tfidf_matrix[0, i] for i in range(len(feature_names))}

        # Debugging output
        # print("TF-IDF Keys Sample:", list(tfidf_scores.keys())[:10])

        return tfidf_scores
    except Exception as e:
        print(f"Error calculating TF-IDF for n-grams: {e}")
        return {}


In [None]:
# Subtracts domain ngrams from each other
def subtract_ngrams(primary_ngrams, secondary_ngrams):
    """Subtracts ngram counts from primary based on secondary, returns a dictionary."""
    unique_primary = Counter(primary_ngrams)
    for ngram, count in secondary_ngrams.items():
        if ngram in unique_primary:
            unique_primary[ngram] -= count
            if unique_primary[ngram] <= 0:
                del unique_primary[ngram]
    return unique_primary

In [None]:
# Sorts n-grams by frequency and saves the top_n from high to low, 'top_n' being how many terms you want to save. Default is top 100.
def save_top_ngrams(domain, ngram_path, top_n=100):
    try:
        ngrams = load_ngrams_from_file(ngram_path)
        top_ngrams = ngrams.most_common(top_n)
        formatted_ngrams = [format_ngrams(ngram) for ngram, _ in top_ngrams]

        paths = setup_domain_paths(domain)
        save_path = os.path.join(paths["base"], f"{domain}_top_ngrams.txt")
        with open(save_path, 'w') as file:
            file.write('\n'.join(formatted_ngrams))
        print(f"N-grams for {domain} saved to {save_path}")
    except Exception as e:
        print(f"Failed to save top n-grams for {domain}. Error: {e}")

In [None]:
# visualize the terms using a word cloud

def generate_word_cloud(terms, tfidf_scores, title):
    # Create a frequency dictionary using TF-IDF scores as weights
    term_frequencies = {format_ngrams(term): tfidf_scores.get(term.replace(' ', '_'), 0) for term in terms}
    wordcloud = WordCloud(width=800, height=400).generate_from_frequencies(term_frequencies)

    plt.figure(figsize=(10, 5))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title(title)
    plt.show()


In [None]:
def filter_ngrams_by_tfidf(ngrams, tfidf_scores, threshold=0.1):
    filtered_ngrams = {}
    for ngram, count in ngrams.items():
        if tfidf_scores.get(ngram, 0) > threshold:  # Use the ngram as is
            filtered_ngrams[ngram] = count
       # print(f"Checking n-gram: {ngram}, Score: {tfidf_scores.get(ngram, 'Not Found')}, Threshold: {threshold}")
    return filtered_ngrams



In [None]:
def save_filtered_by_tfidf(domain, ngrams, tfidf_scores, top_n=50, threshold=0.1):
    filtered_ngrams = filter_ngrams_by_tfidf(ngrams, tfidf_scores, threshold)
    if not filtered_ngrams:
        print(f"No n-grams passed the TF-IDF filtering for {domain}, check TF-IDF scores and threshold.")
    sorted_filtered_ngrams = sorted(filtered_ngrams.items(), key=lambda x: tfidf_scores.get(x[0].replace(' ', '_'), 0), reverse=True)[:top_n]
    formatted_terms = [format_ngrams(term) for term, _ in sorted_filtered_ngrams]

    save_path = f'{setup_domain_paths(domain)["base"]}/{domain}_tfidf_filtered_ngrams.txt'
    with open(save_path, 'w') as file:
        file.write('\n'.join(formatted_terms))
    print(f"TF-IDF filtered n-grams saved to {save_path}")


In [None]:
def save_unique_terms(domain, unique_terms, top_n=50):
    sorted_terms = sorted(unique_terms.items(), key=lambda x: x[1], reverse=True)[:top_n]
    formatted_terms = [format_ngrams(term) for term, _ in sorted_terms]

    with open(f'{setup_domain_paths(domain)["base"]}/{domain}_unique_terms.txt', 'w') as file:
        file.write('\n'.join(formatted_terms))


In [None]:
# Just loads the n-grams from the given path
def load_ngrams_from_file(file_path):
    with open(file_path, 'r') as file:
        ngrams = file.read().split()
    return Counter(ngrams)


This is the main loop

In [None]:
# Dictionary to store paths for n-grams for analysis later
ngram_paths = {}

for domain in domains:
    print(f"Processing domain: {domain}")

    reset_dictionaries()  # Reset dictionaries for the new domain

    # Fetch video IDs
    video_ids = fetch_video_ids(domain, max_results=num_videos_to_fetch)  # Use a smaller number for initial testing
    print(f"First 5 video IDs for {domain}: {video_ids[:5]}")  # Display first 5 IDs to verify

    # Download and process transcripts, then merge them
    processed_text = download_and_process_transcripts_concurrently(domain, video_ids)
    print(f"Transcripts processed for {domain}")

    # Merge transcripts into a single file
    merged_text_path = merge_transcripts(domain)
    print(f"Merged transcripts for {domain} saved")

    # Generate n-grams from the merged transcripts
    merged_text_path = os.path.join(setup_domain_paths(domain)['processed'], f'{domain}_merged.txt')
    ngram_path = generate_ngrams(domain, merged_text_path, domain_terms[domain])
    ngram_paths[domain] = ngram_path
    print(f"N-grams for {domain} saved to {ngram_path}")

    save_top_ngrams(domain, ngram_path, top_n=100)




Processing domain: Destiny 2
First 5 video IDs for Destiny 2: ['yMwc0HrMBvg', 'paakU92hKiw', 'ghXGCov9nig', 'azkPOyt6DUk', '1j8RFA1DMw8']
Transcripts processed for Destiny 2
Merged transcripts for Destiny 2 saved
N-grams for Destiny 2 saved to /content/drive/MyDrive/Colab Notebooks/major_project/Destiny 2/processed_transcripts/Destiny 2_ngrams.txt
N-grams for Destiny 2 saved to /content/drive/MyDrive/Colab Notebooks/major_project/Destiny 2/Destiny 2_top_ngrams.txt
Processing domain: warframe
First 5 video IDs for warframe: ['WUMPxnpmtKQ', '4gi_9Ypouuc', 'WeTjeaPr1O8', 'ZS_XADPeCHw', 'NkNu-eJ8qFs']
Transcripts processed for warframe
Merged transcripts for warframe saved
N-grams for warframe saved to /content/drive/MyDrive/Colab Notebooks/major_project/warframe/processed_transcripts/warframe_ngrams.txt
N-grams for warframe saved to /content/drive/MyDrive/Colab Notebooks/major_project/warframe/warframe_top_ngrams.txt


In [None]:
destiny_ngrams = load_ngrams_from_file(ngram_paths['Destiny 2'])
warframe_ngrams = load_ngrams_from_file(ngram_paths['warframe'])

print(f"Top 10 n-grams for Destiny 2: {destiny_ngrams.most_common(10)}")
print(f"Top 10 n-grams for warframe: {warframe_ngrams.most_common(10)}")

Top 10 n-grams for Destiny 2: [('g_a_m_e', 88), ('f_i_n_a_l___s_h_a_p_e', 84), ('s_e_t_t_i_n_g', 66), ('g_a_r_d_e_n_e_r', 46), ('p_e_o_p_l_e', 42), ('d_e_s_t_i_n_y', 42), ('d_e_s_t_i_n_y___2', 40), ('u_n_i_v_e_r_s_e', 40), ('t_u_r_n', 36), ('b_u_i_l_d', 36)]
Top 10 n-grams for warframe: [('d_a_m_a_g_e', 140), ('b_o_n_u_s', 106), ('s_h_a_r_d', 90), ('e_c_l_i_p_s_e', 70), ('s_l_o_t', 62), ('v_i_d_e_o', 58), ('t_i_m_e', 44), ('b_u_f_f', 44), ('a_b_i_l_i_t_y', 40), ('r_o_a_r', 40)]


In [None]:
unique_destiny = subtract_ngrams(destiny_ngrams, warframe_ngrams)
unique_warframe = subtract_ngrams(warframe_ngrams, destiny_ngrams)

print(f"Unique terms for Destiny 2: {unique_destiny}")
print(f"Unique terms for warframe: {unique_warframe}")


Unique terms for Destiny 2: Counter({'g_a_m_e': 84, 'f_i_n_a_l___s_h_a_p_e': 84, 's_e_t_t_i_n_g': 64, 'g_a_r_d_e_n_e_r': 46, 'd_e_s_t_i_n_y': 42, 'd_e_s_t_i_n_y___2': 40, 'u_n_i_v_e_r_s_e': 40, 't_u_r_n': 36, 'l_i_g_h_t': 34, 's_i_m_p_l_y': 32, 'l_a_w': 30, 'p_v_p': 26, 'p_e_o_p_l_e': 24, 'g_r_e_n_a_d_e': 24, 'p_a_t_t_e_r_n': 24, 'w_i_t_n_e_s_s': 24, 'u_n_v_e_i_l_i_n_g': 22, 't_a_r_g_e_t': 20, 'l_i_f_e': 20, 'p_l_a_y_e_r': 20, 'd_a_r_k': 20, 'b_u_i_l_d': 20, 'w_i_n': 20, 's_t_y_l_e': 20, 't_u_r_n_e_d': 18, 'b_l_u_r': 18, 'b_a_s_i_c_a_l_l_y': 18, 's_h_o_t': 18, 'g_r_e_a_t': 18, 'e_n_e_m_y': 18, 'i_d_e_a': 18, 'p_u_r_p_o_s_e': 18, 'a_r_t': 18, 'l_o_t': 16, '1': 16, 'g_u_a_r_d_i_a_n': 16, 'f_i_n_i_s_h_e_r': 16, 't_r_a_n_s_c_e_n_d_e_n_c_e': 16, 'e_x_i_s_t_e_n_c_e': 16, 'c_o_m_p_l_e_x_i_t_y': 16, 'a_i_m_i_n_g': 14, 'f_e_e_l': 14, 'p_v_e': 14, 'd_i_r_e_c_t_i_o_n': 14, 'r_e_s_i_s_t_a_n_c_e': 14, 'b_o_d_y': 14, 'l_o_g_i_c': 14, 'e_x_i_s_t': 14, 't_r_a_v_e_l_e_r': 14, 't_h_o_r_n': 14, 's_c_r_e_

In [None]:
"""
# Plot the distribution of TF-IDF scores
scores = list(tfidf_scores_destiny.values())
plt.hist(scores, bins=30, alpha=0.75)
plt.title('Distribution of TF-IDF Scores for Destiny 2')
plt.xlabel('TF-IDF Score')
plt.ylabel('Frequency')
plt.show()
"""


"\n# Plot the distribution of TF-IDF scores\nscores = list(tfidf_scores_destiny.values())\nplt.hist(scores, bins=30, alpha=0.75)\nplt.title('Distribution of TF-IDF Scores for Destiny 2')\nplt.xlabel('TF-IDF Score')\nplt.ylabel('Frequency')\nplt.show()\n"

In [None]:
save_unique_terms('Destiny 2', unique_destiny, 100)
save_unique_terms('warframe', unique_warframe, 100)

In [None]:
# Calculate TF-IDF scores for the domain's n-grams
tfidf_scores_destiny = calculate_tfidf_from_ngrams(ngram_paths['Destiny 2'])
tfidf_scores_warframe = calculate_tfidf_from_ngrams(ngram_paths['warframe'])

# Load n-grams (assuming they are stored in a way that Counter can handle)
destiny_ngrams = load_ngrams_from_file(ngram_paths['Destiny 2'])
warframe_ngrams = load_ngrams_from_file(ngram_paths['warframe'])

# Call the function with the required parameters
save_filtered_by_tfidf('Destiny 2', destiny_ngrams, tfidf_scores_destiny, top_n=50, threshold=0.05) # Adjust the threshold as required
save_filtered_by_tfidf('warframe', warframe_ngrams, tfidf_scores_warframe, top_n=50, threshold=0.05)

TF-IDF filtered n-grams saved to /content/drive/MyDrive/Colab Notebooks/major_project/Destiny 2/Destiny 2_tfidf_filtered_ngrams.txt
TF-IDF filtered n-grams saved to /content/drive/MyDrive/Colab Notebooks/major_project/warframe/warframe_tfidf_filtered_ngrams.txt


# **Tests**

In [None]:
"""
# Test for subtract_ngrams
primary_ngrams = Counter({'destiny_game': 10, 'fire_fly': 5, 'alien_ship': 2})
secondary_ngrams = Counter({'destiny_game': 2, 'fire_fly': 5})

unique_ngrams = subtract_ngrams(primary_ngrams, secondary_ngrams)
print("Unique N-Grams after Subtraction:", unique_ngrams)  # Expect 'destiny_game' with 8 and 'alien_ship' with 2
"""


'\n# Test for subtract_ngrams\nprimary_ngrams = Counter({\'destiny_game\': 10, \'fire_fly\': 5, \'alien_ship\': 2})\nsecondary_ngrams = Counter({\'destiny_game\': 2, \'fire_fly\': 5})\n\nunique_ngrams = subtract_ngrams(primary_ngrams, secondary_ngrams)\nprint("Unique N-Grams after Subtraction:", unique_ngrams)  # Expect \'destiny_game\' with 8 and \'alien_ship\' with 2\n'

In [None]:
"""
# Test for calculate_tfidf_from_ngrams
ngram_test_path = r'/content/drive/MyDrive/Colab Notebooks/major_project/test_documents/test_ngrams.txt'  # This file should contain n-grams

tfidf_scores = calculate_tfidf_from_ngrams(ngram_test_path)
print("TF-IDF Scores:", tfidf_scores)"""

'\n# Test for calculate_tfidf_from_ngrams\nngram_test_path = r\'/content/drive/MyDrive/Colab Notebooks/major_project/test_documents/test_ngrams.txt\'  # This file should contain n-grams\n\ntfidf_scores = calculate_tfidf_from_ngrams(ngram_test_path)\nprint("TF-IDF Scores:", tfidf_scores)'

In [None]:
"""
# Test for filter_ngrams_by_tfidf
test_ngrams = {'destiny_game': 10, 'fire_fly': 5, 'alien_ship': 2}
test_tfidf_scores = {'destiny_game': 0.3, 'fire_fly': 0.05, 'alien_ship': 0.01}

filtered_ngrams = filter_ngrams_by_tfidf(test_ngrams, test_tfidf_scores, threshold=0.1)
print("Filtered N-Grams:", filtered_ngrams)  # Expect only 'destiny_game' to be in the output
"""

'\n# Test for filter_ngrams_by_tfidf\ntest_ngrams = {\'destiny_game\': 10, \'fire_fly\': 5, \'alien_ship\': 2}\ntest_tfidf_scores = {\'destiny_game\': 0.3, \'fire_fly\': 0.05, \'alien_ship\': 0.01}\n\nfiltered_ngrams = filter_ngrams_by_tfidf(test_ngrams, test_tfidf_scores, threshold=0.1)\nprint("Filtered N-Grams:", filtered_ngrams)  # Expect only \'destiny_game\' to be in the output\n'

In [None]:
"""# Test for prepare_ngram_text_for_tfidf
test_ngrams_text = "s_e_t_t_i_n_g d_e_s_t_i_n_y l_i_t_e_r_a_l_l_y"
prepared_text = prepare_ngram_text_for_tfidf(test_ngrams_text)
print("Prepared Text for TF-IDF:", prepared_text)  # Manually check if the output is correctly formatted
"""

'# Test for prepare_ngram_text_for_tfidf\ntest_ngrams_text = "s_e_t_t_i_n_g d_e_s_t_i_n_y l_i_t_e_r_a_l_l_y"\nprepared_text = prepare_ngram_text_for_tfidf(test_ngrams_text)\nprint("Prepared Text for TF-IDF:", prepared_text)  # Manually check if the output is correctly formatted\n'

In [None]:
""" # Test for format_ngrams
test_ngram = "f_i_r_e_f_l_y o_r f_i_r_e___f_l_y"
formatted_ngram = format_ngrams(test_ngram)
print("Formatted N-Gram:", formatted_ngram)  # Check if underscores are correctly handled
"""

' # Test for format_ngrams\ntest_ngram = "f_i_r_e_f_l_y o_r f_i_r_e___f_l_y"\nformatted_ngram = format_ngrams(test_ngram)\nprint("Formatted N-Gram:", formatted_ngram)  # Check if underscores are correctly handled\n'

In [None]:
"""
# Test for load_ngrams_from_file
test_ngram_file_path = r'/content/drive/MyDrive/Colab Notebooks/major_project/test_documents/test_ngrams.txt'
loaded_ngrams = load_ngrams_from_file(test_ngram_file_path)
print("Loaded N-Grams:", loaded_ngrams)
"""


'\n# Test for load_ngrams_from_file\ntest_ngram_file_path = r\'/content/drive/MyDrive/Colab Notebooks/major_project/test_documents/test_ngrams.txt\'\nloaded_ngrams = load_ngrams_from_file(test_ngram_file_path)\nprint("Loaded N-Grams:", loaded_ngrams)\n'

In [None]:
"""# Test for fetch_video_ids
test_domain = 'Mechanical Engineering'
video_ids = fetch_video_ids(test_domain, max_results=5)
print("Fetched Video IDs:", video_ids)
"""

'# Test for fetch_video_ids\ntest_domain = \'Mechanical Engineering\'\nvideo_ids = fetch_video_ids(test_domain, max_results=5)\nprint("Fetched Video IDs:", video_ids)\n'

In [None]:
"""# Test for download_and_process_transcripts_concurrently
test_video_ids = ['Yk8u7ID5JtU', '1j8RFA1DMw8']  # Sample video IDs
processed_transcripts = download_and_process_transcripts_concurrently('test_documents', test_video_ids)
"""

"# Test for download_and_process_transcripts_concurrently\ntest_video_ids = ['Yk8u7ID5JtU', '1j8RFA1DMw8']  # Sample video IDs\nprocessed_transcripts = download_and_process_transcripts_concurrently('test_documents', test_video_ids)\n"

In [None]:
"""
# Test for merge_transcripts
test_domain = 'Destiny 2'
merged_text = merge_transcripts(test_domain)
print("Merged Transcripts:", merged_text)"""

'\n# Test for merge_transcripts\ntest_domain = \'Destiny 2\'\nmerged_text = merge_transcripts(test_domain)\nprint("Merged Transcripts:", merged_text)'

In [None]:
"""
# Test for google_spell_check
test_word = "zenovage"
corrected_word = google_spell_check(test_word, "Destiny 2")
print("Misspelt Word:", test_word)
print("Corrected Word:", corrected_word)  # Should return the correct spelling if found by Google's "Did you mean"""


'\n# Test for google_spell_check\ntest_word = "zenovage"\ncorrected_word = google_spell_check(test_word, "Destiny 2")\nprint("Misspelt Word:", test_word)\nprint("Corrected Word:", corrected_word)  # Should return the correct spelling if found by Google\'s "Did you mean'

In [None]:
"""
# Mock data
test_text = "The gaurdian battles against foes in the univrse. Using the zenovage and the vex mythoclass"
test_domain = "Destiny 2"
paths = setup_domain_paths(test_domain)

test_paths = {"validated": "Destiny 2_validated_words.json", "corrected": "Destiny 2_corrected_words_dict.json"}

validated_words = {}
corrected_words_dict = {}

# Run the test
corrected_text = spell_check(test_text, test_domain, paths)
print("Corrected Text:", corrected_text)
"""

'\n# Mock data\ntest_text = "The gaurdian battles against foes in the univrse. Using the zenovage and the vex mythoclass"\ntest_domain = "Destiny 2"\npaths = setup_domain_paths(test_domain)\n\ntest_paths = {"validated": "Destiny 2_validated_words.json", "corrected": "Destiny 2_corrected_words_dict.json"}\n\nvalidated_words = {}\ncorrected_words_dict = {}\n\n# Run the test\ncorrected_text = spell_check(test_text, test_domain, paths)\nprint("Corrected Text:", corrected_text)\n'

In [None]:
"""
# Test for process_text
test_text = "Guardians fight with resiliences and strength against enemies using the light and sometimes darkness. The guardian's light never fades."
domain_terms = {'guardians', 'enemies'}  # domain-specific terms
processed_tokens = process_text(test_text, domain_terms)
print("Processed Tokens:", processed_tokens)
"""

'\n# Test for process_text\ntest_text = "Guardians fight with resiliences and strength against enemies using the light and sometimes darkness. The guardian\'s light never fades."\ndomain_terms = {\'guardians\', \'enemies\'}  # domain-specific terms\nprocessed_tokens = process_text(test_text, domain_terms)\nprint("Processed Tokens:", processed_tokens)\n'

In [None]:
"""
# Path to the input file with sample merged text
test_merged_text_path = r'/content/drive/MyDrive/Colab Notebooks/major_project/test_documents/test_merged.txt'
test_domain = 'test_documents'
test_domain_terms = {'guardian', 'enemies', 'light'}  # domain-specific terms

ngram_output_path = generate_ngrams(test_domain, test_merged_text_path, test_domain_terms)
print("N-grams saved to:", ngram_output_path, "\n")

with open(ngram_output_path, 'r') as file:
    generated_ngrams = file.read()
print("Generated N-grams:", generated_ngrams)
"""

'\n# Path to the input file with sample merged text\ntest_merged_text_path = r\'/content/drive/MyDrive/Colab Notebooks/major_project/test_documents/test_merged.txt\'\ntest_domain = \'test_documents\'\ntest_domain_terms = {\'guardian\', \'enemies\', \'light\'}  # domain-specific terms\n\nngram_output_path = generate_ngrams(test_domain, test_merged_text_path, test_domain_terms)\nprint("N-grams saved to:", ngram_output_path, "\n")\n\nwith open(ngram_output_path, \'r\') as file:\n    generated_ngrams = file.read()\nprint("Generated N-grams:", generated_ngrams)\n'

In [None]:
"""# Mock data for unique terms
unique_terms = {
    'guardian': 100,
    'light': 80,
    'enemy': 50,
}

# Mock domain setup
domain = 'test_documents'
test_base_path = r'/content/drive/MyDrive/Colab Notebooks/major_project/test_documents'

save_unique_terms(domain, unique_terms, top_n=3)
print(f"Unique terms saved to '{test_base_path}/{domain}_unique_terms.txt'")
"""

'# Mock data for unique terms\nunique_terms = {\n    \'guardian\': 100,\n    \'light\': 80,\n    \'enemy\': 50,\n}\n\n# Mock domain setup\ndomain = \'test_documents\'\ntest_base_path = r\'/content/drive/MyDrive/Colab Notebooks/major_project/test_documents\'\n\nsave_unique_terms(domain, unique_terms, top_n=3)\nprint(f"Unique terms saved to \'{test_base_path}/{domain}_unique_terms.txt\'")\n'

In [None]:
"""
# Mock data for n-grams and their TF-IDF scores
ngrams = {'g_u_a_r_d_i_a_n___l_i_g_h_t': 5, 'e_n_e_m_y___d_a_r_k_n_e_s_s': 3, 'b_a_t_t_l_e___f_i_e_l_d': 1}
tfidf_scores = {'g_u_a_r_d_i_a_n___l_i_g_h_t': 0.5, 'e_n_e_m_y___d_a_r_k_n_e_s_s': 0.3, 'b_a_t_t_l_e___f_i_e_l_d': 0.2}

# Test the function
save_filtered_by_tfidf(domain, ngrams, tfidf_scores, top_n=2, threshold=0.1)
print(f"Filtered TF-IDF n-grams saved to '{test_base_path}/{domain}_tfidf_filtered_ngrams.txt'")
"""

'\n# Mock data for n-grams and their TF-IDF scores\nngrams = {\'g_u_a_r_d_i_a_n___l_i_g_h_t\': 5, \'e_n_e_m_y___d_a_r_k_n_e_s_s\': 3, \'b_a_t_t_l_e___f_i_e_l_d\': 1}\ntfidf_scores = {\'g_u_a_r_d_i_a_n___l_i_g_h_t\': 0.5, \'e_n_e_m_y___d_a_r_k_n_e_s_s\': 0.3, \'b_a_t_t_l_e___f_i_e_l_d\': 0.2}\n\n# Test the function\nsave_filtered_by_tfidf(domain, ngrams, tfidf_scores, top_n=2, threshold=0.1)\nprint(f"Filtered TF-IDF n-grams saved to \'{test_base_path}/{domain}_tfidf_filtered_ngrams.txt\'")\n'

In [None]:
"""
# Path to a file containing 'test' n-grams
ngram_test_path = r'/content/drive/MyDrive/Colab Notebooks/major_project/Destiny 2/processed_transcripts/Destiny 2_ngrams.txt'

save_top_ngrams(ngram_test_path, top_n=5)
print(f"Top n-grams saved to '{test_base_path}/{domain}_top_ngrams.txt'")
"""

'\n# Path to a file containing \'test\' n-grams\nngram_test_path = r\'/content/drive/MyDrive/Colab Notebooks/major_project/Destiny 2/processed_transcripts/Destiny 2_ngrams.txt\'\n\nsave_top_ngrams(ngram_test_path, top_n=5)\nprint(f"Top n-grams saved to \'{test_base_path}/{domain}_top_ngrams.txt\'")\n'