In [3]:
import json
import os
import re
import spacy
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from tqdm import tqdm
import seaborn as sns
from gensim.models import Word2Vec
from sklearn.manifold import TSNE
from nltk.tokenize import sent_tokenize, word_tokenize
import nltk
from nltk.corpus import stopwords
from transformers import BertTokenizer, BertModel
import torch
import requests
from bs4 import BeautifulSoup
import time
import re
import random
import argparse
from urllib.parse import urlparse, urljoin
from collections import deque
import logging
from tqdm import tqdm
import concurrent.futures

In [None]:
class UnityWebScraper:
    def __init__(self, start_urls, output_dir, max_pages=1000, delay=1, concurrent=5):
        self.start_urls = start_urls
        self.output_dir = output_dir
        self.max_pages = max_pages
        self.delay = delay
        self.concurrent = concurrent
        
        # Track visited URLs to avoid duplicates
        self.visited = set()
        
        # URLs to be visited
        self.url_queue = deque(start_urls)
        
        # Create output directory if it doesn't exist
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
            
        # Set up logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(os.path.join(output_dir, 'scraper.log')),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
        # Headers to simulate a real browser
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Referer': 'https://www.google.com/',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }
        
        # Regular expressions for finding Unity-related content
        self.unity_patterns = [
            r'unity', r'game.*develop', r'component', r'gameobject',
            r'transform', r'scripting', r'c#', r'shader', r'material',
            r'animation', r'physics', r'collision', r'rigidbody',
            r'prefab', r'scene', r'editor', r'UI', r'input', r'camera'
        ]
        
        # Domains that are allowed for scraping
        self.allowed_domains = [
            'unity.com',
            'docs.unity3d.com',
            'learn.unity.com',
            'unity3d.com',
            'forum.unity.com',
            'answers.unity.com',
            'gamedev.stackexchange.com',
            'unitycodemonkey.com',
            'catlikecoding.com',
            'raywenderlich.com',
            'brackeys.com',
            'unity3d.college',
        ]
        
        # Counter for scraped pages
        self.pages_scraped = 0
        
        # Store for scraped data
        self.scraped_data = []

    def is_valid_url(self, url):
        """Check if URL is valid and should be scraped"""
        if not url or not url.startswith('http'):
            return False
            
        # Check if it's a Unity-allowed domain
        parsed_url = urlparse(url)
        domain = parsed_url.netloc
        
        # Check if domain is in allowed list
        domain_allowed = any(allowed_domain in domain for allowed_domain in self.allowed_domains)
        
        # Skip URLs with parameters to avoid duplicate content
        has_few_params = len(parsed_url.query) < 30
        
        # Skip certain file types
        skip_extensions = ['.pdf', '.zip', '.exe', '.dmg', '.pkg', '.unitypackage', 
                          '.jpg', '.jpeg', '.png', '.gif', '.svg', '.mp4', '.webm']
        has_valid_extension = not any(url.endswith(ext) for ext in skip_extensions)
        
        # Skip RSS feeds and other non-content pages
        skip_patterns = ['rss', 'feed', 'sitemap', 'login', 'register', 'signup', 'signin']
        has_no_skip_patterns = not any(pattern in url.lower() for pattern in skip_patterns)
        
        return domain_allowed and has_few_params and has_valid_extension and has_no_skip_patterns

    def is_unity_related(self, text):
        """Check if the content is related to Unity"""
        text_lower = text.lower()
        return any(re.search(pattern, text_lower) for pattern in self.unity_patterns)

    def extract_content(self, soup, url):
        """
        Extract relevant content from a web page
        
        Args:
            soup (BeautifulSoup): Parsed HTML
            url (str): URL of the page
            
        Returns:
            dict: Extracted content
        """
        # Extract title
        title = soup.title.text.strip() if soup.title else "No Title"
        
        # Extract meta description
        meta_desc = ""
        meta_tag = soup.find("meta", attrs={"name": "description"}) or soup.find("meta", attrs={"property": "og:description"})
        if meta_tag and meta_tag.get("content"):
            meta_desc = meta_tag["content"].strip()
            
        # Extract main content
        content = ""
        
        # Try to find main content area with common selectors
        main_selectors = [
            "main", "article", ".content", "#content", ".main-content", 
            ".documentation", ".doc-content", ".tutorial-content", ".post-content",
            ".entry-content", "#main-content", ".unity-content", ".manual-content"
        ]
        
        for selector in main_selectors:
            main_content = soup.select_one(selector)
            if main_content:
                content += main_content.get_text(separator=" ", strip=True) + " "
                break
        
        # If no main content area found, use <p> tags as fallback
        if not content:
            paragraphs = soup.find_all("p")
            content = " ".join(p.get_text(strip=True) for p in paragraphs)
        
        # Extract code snippets
        code_snippets = []
        for code_tag in soup.find_all(["code", "pre"]):
            snippet = code_tag.get_text(strip=True)
            if snippet and len(snippet) > 10:  # Ignore very short snippets
                code_snippets.append(snippet)
        
        # Extract headings for structure
        headings = []
        for heading in soup.find_all(["h1", "h2", "h3"]):
            heading_text = heading.get_text(strip=True)
            if heading_text:
                headings.append({
                    "level": int(heading.name[1]),
                    "text": heading_text
                })
        
        # Get categories/tags if available
        categories = []
        for tag in soup.find_all(["a", "span"], class_=["tag", "category", "topic"]):
            tag_text = tag.get_text(strip=True)
            if tag_text:
                categories.append(tag_text)
        
        # Extract timestamp if available
        published_date = None
        date_selectors = [
            'time', '.date', '.published', '.post-date', 
            'meta[property="article:published_time"]'
        ]
        
        for selector in date_selectors:
            date_element = soup.select_one(selector)
            if date_element:
                if date_element.has_attr('datetime'):
                    published_date = date_element['datetime']
                else:
                    published_date = date_element.get_text(strip=True)
                break
        
        # Construct the result
        result = {
            "url": url,
            "title": title,
            "description": meta_desc,
            "content": content,
            "headings": headings,
            "code_snippets": code_snippets,
            "categories": categories,
            "published_date": published_date,
            "scrape_timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }
        
        return result

    def extract_links(self, soup, base_url):
        """
        Extract links from a web page
        
        Args:
            soup (BeautifulSoup): Parsed HTML
            base_url (str): Base URL for resolving relative links
            
        Returns:
            list: List of extracted links
        """
        links = []
        
        for a_tag in soup.find_all("a", href=True):
            href = a_tag["href"]
            
            # Skip empty links, javascript, and anchors
            if not href or href.startswith(("javascript:", "#", "mailto:", "tel:")):
                continue
                
            # Resolve relative URLs
            absolute_url = urljoin(base_url, href)
            
            # Check if the URL is valid
            if self.is_valid_url(absolute_url) and absolute_url not in self.visited:
                links.append(absolute_url)
                
        return links

    def scrape_url(self, url):
        """
        Scrape a single URL
        
        Args:
            url (str): URL to scrape
            
        Returns:
            tuple: (success, data, new_links)
        """
        try:
            # Add a random delay
            time.sleep(self.delay + random.uniform(0.1, 0.5))
            
            # Make the request
            response = requests.get(url, headers=self.headers, timeout=10)
            
            # Check if the request was successful
            if response.status_code != 200:
                self.logger.warning(f"Failed to fetch {url}: Status code {response.status_code}")
                return False, None, []
                
            # Parse the HTML
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # Check if content is Unity-related
            page_text = soup.get_text()
            if not self.is_unity_related(page_text):
                self.logger.info(f"Skipping {url}: Not Unity-related")
                return False, None, []
                
            # Extract content
            content = self.extract_content(soup, url)
            
            # Extract links
            links = self.extract_links(soup, url)
            
            return True, content, links
            
        except Exception as e:
            self.logger.error(f"Error scraping {url}: {str(e)}")
            return False, None, []

    def process_batch(self, batch):
        """Process a batch of URLs with concurrent requests"""
        results = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrent) as executor:
            future_to_url = {executor.submit(self.scrape_url, url): url for url in batch}
            
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    success, content, links = future.result()
                    
                    if success and content:
                        results.append((content, links))
                        self.visited.add(url)
                except Exception as e:
                    self.logger.error(f"Error processing {url}: {str(e)}")
        
        return results

    def save_batch(self, batch_data, batch_num):
        """Save a batch of scraped data to JSON"""
        filename = os.path.join(self.output_dir, f"unity_data_batch_{batch_num}.json")
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(batch_data, f, indent=2, ensure_ascii=False)
            
        self.logger.info(f"Saved batch {batch_num} with {len(batch_data)} pages to {filename}")

    def run(self):
        """Run the scraper"""
        self.logger.info(f"Starting Unity web scraper with {len(self.start_urls)} seed URLs")
        self.logger.info(f"Max pages: {self.max_pages}, Delay: {self.delay}s, Concurrent: {self.concurrent}")
        
        batch_size = 50  # Number of scraped items per file
        current_batch = []
        batch_count = 1
        
        # Use tqdm for progress tracking
        pbar = tqdm(total=self.max_pages, desc="Scraping Unity docs")
        
        while self.url_queue and self.pages_scraped < self.max_pages:
            # Get a batch of URLs to process
            batch_urls = []
            while self.url_queue and len(batch_urls) < self.concurrent:
                url = self.url_queue.popleft()
                if url not in self.visited:
                    batch_urls.append(url)
                    self.visited.add(url)
            
            if not batch_urls:
                break
                
            # Process the batch
            batch_results = self.process_batch(batch_urls)
            
            # Handle results
            for content, links in batch_results:
                # Add content to current batch
                current_batch.append(content)
                self.pages_scraped += 1
                pbar.update(1)
                
                # Add new links to queue
                for link in links:
                    if link not in self.visited:
                        self.url_queue.append(link)
                
                # Save batch if it reaches batch_size
                if len(current_batch) >= batch_size:
                    self.save_batch(current_batch, batch_count)
                    current_batch = []
                    batch_count += 1
                    
                # Check if we've reached the maximum
                if self.pages_scraped >= self.max_pages:
                    break
        
        # Save any remaining data
        if current_batch:
            self.save_batch(current_batch, batch_count)
            
        pbar.close()
        self.logger.info(f"Scraping complete. Scraped {self.pages_scraped} pages.")


In [None]:
parser = argparse.ArgumentParser(description='Scrape Unity documentation and tutorials')
parser.add_argument('--output', '-o', default='./unity_json_data', help='Output directory')
parser.add_argument('--max-pages', '-m', type=int, default=1000, help='Maximum number of pages to scrape')
parser.add_argument('--delay', '-d', type=float, default=1.0, help='Delay between requests in seconds')
parser.add_argument('--concurrent', '-c', type=int, default=5, help='Number of concurrent requests')
parser.add_argument('--seed-file', '-s', help='File containing seed URLs (one per line)')
args = parser.parse_args(args=[]) 

default_seeds = [
    'https://docs.unity3d.com/Manual/index.html',
    'https://learn.unity.com/',
    'https://docs.unity3d.com/ScriptReference/index.html',
    'https://unity.com/how-to',
    'https://forum.unity.com/',
    'https://gamedev.stackexchange.com/questions/tagged/unity',
]
    
    # Load seed URLs from file if provided
if args.seed_file:
    try:
        with open(args.seed_file, 'r') as f:
            seeds = [line.strip() for line in f if line.strip()]
    except Exception as e:
        print(f"Error loading seed file: {str(e)}")
        print("Using default seeds instead.")
        seeds = default_seeds
else:
    seeds = default_seeds
    
    # Create and run the scraper
scraper = UnityWebScraper(
    start_urls=seeds,
    output_dir=args.output,
    max_pages=args.max_pages,
    delay=args.delay,
    concurrent=args.concurrent
)
    
scraper.run()

2025-04-23 20:50:35,220 - INFO - Starting Unity web scraper with 6 seed URLs
2025-04-23 20:50:35,227 - INFO - Max pages: 1000, Delay: 1.0s, Concurrent: 5


KeyboardInterrupt: 

In [17]:
class UnityNLPProcessor:
    def __init__(self, input_dir, output_dir):
        """
        Initialize NLP processor for Unity documentation
        
        Args:
            input_dir (str): Directory containing JSON files with Unity documentation
            output_dir (str): Directory to save processed data and visualizations
        """
        if not os.path.exists(input_dir):
            raise ValueError(f"Input directory {input_dir} does not exist")
            
        self.input_dir = input_dir
        self.output_dir = output_dir
        
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        # Initialize stopwords
        self.stop_words = set(stopwords.words('english'))
        self.unity_stop_words = {
            'unity', 'game', 'object', 'function', 'method', 'script', 'component',
            'class', 'public', 'private', 'void', 'return', 'using', 'namespace'
        }
        self.stop_words.update(self.unity_stop_words)
        
        # Storage for processed data
        self.processed_docs = []
        self.all_sentences = []
        self.all_tokens = []
        self.pos_tags = []
        self.word_embeddings = {}
        
    def load_json_data(self):
        """Load and combine all JSON files from the input directory"""
        combined_data = []
        print(f"Loading JSON files from {self.input_dir}...")
        for filename in tqdm(os.listdir(self.input_dir)):
            if filename.endswith('.json'):
                file_path = os.path.join(self.input_dir, filename)
                try:
                    with open(file_path, 'r', encoding='utf-8') as f:
                        data = json.load(f)
                        if isinstance(data, list):
                            combined_data.extend(data)
                        else:
                            combined_data.append(data)
                except Exception as e:
                    print(f"Error loading {filename}: {str(e)}")
        print(f"Loaded {len(combined_data)} JSON documents")
        return combined_data
    
    def extract_text_from_json(self, json_data):
        """Extract text from JSON data based on common fields"""
        texts = []
        for item in json_data:
            if isinstance(item, dict):
                doc_text = []
                if 'content' in item and isinstance(item['content'], str):
                    doc_text.append(item['content'])
                if 'description' in item and isinstance(item['description'], str):
                    doc_text.append(item['description'])
                if 'title' in item and isinstance(item['title'], str):
                    doc_text.append(item['title'])
                if 'code_snippets' in item and isinstance(item['code_snippets'], list):
                    for snippet in item['code_snippets']:
                        if isinstance(snippet, str):
                            doc_text.append(snippet)
                if 'headings' in item and isinstance(item['headings'], list):
                    for heading in item['headings']:
                        if isinstance(heading, dict) and 'text' in heading:
                            doc_text.append(heading['text'])
                if doc_text:
                    texts.append(' '.join(doc_text))
        print(f"Extracted text from {len(texts)} documents")
        return texts
    
    def preprocess_text(self, texts):
        """Preprocess extracted text: cleaning and sentence tokenization"""
        processed_docs = []
        all_sentences = []
        print("Preprocessing text...")
        for text in tqdm(texts):
            text = re.sub(r'```.*?```', ' ', text, flags=re.DOTALL)
            text = re.sub(r'<code>.*?</code>', ' ', text, flags=re.DOTALL)
            text = re.sub(r'<[^>]+>', ' ', text)
            text = re.sub(r'[^\w\s\.\(\)\[\]\{\}\<\>\+\-\*\/\=\:\;\,\&\|\!\?]', ' ', text)
            text = text.lower()
            text = re.sub(r'\s+', ' ', text).strip()
            processed_docs.append(text)
            sentences = sent_tokenize(text)
            all_sentences.append(sentences)
            self.all_sentences.extend(sentences)
        self.processed_docs = processed_docs
        print(f"Preprocessed {len(processed_docs)} documents into {sum(len(s) for s in all_sentences)} sentences")
        return processed_docs, all_sentences
    
    def tokenize_and_remove_stopwords(self):
        """Tokenize sentences and remove stopwords"""
        tokenized_sentences = []
        print("Tokenizing and removing stopwords...")
        for sentence in tqdm(self.all_sentences):
            tokens = word_tokenize(sentence)
            filtered = [t for t in tokens if t.lower() not in self.stop_words and len(t) > 2]
            if filtered:
                tokenized_sentences.append(filtered)
                self.all_tokens.extend(filtered)
        print(f"Generated {len(tokenized_sentences)} tokenized sentences")
        return tokenized_sentences
    
    def perform_pos_tagging(self):
        """Perform POS tagging using spaCy"""
        print("Performing POS tagging...")
        pos_tags = []
        for doc in tqdm(nlp.pipe(self.all_sentences, batch_size=50, disable=["ner"])):
            pos_tags.extend([(token.text, token.pos_) for token in doc])
        self.pos_tags = pos_tags
        pos_counts = Counter([tag for _, tag in pos_tags])
        with open(os.path.join(self.output_dir, 'pos_distribution.json'), 'w') as f:
            json.dump(pos_counts, f, indent=2)
        print(f"Performed POS tagging on {len(pos_tags)} tokens")
        return pos_tags
    
    def analyze_pos_distribution(self):
        """Analyze and visualize POS distribution"""
        print("Analyzing POS distribution...")
        pos_counts = Counter([tag for _, tag in self.pos_tags])
        plt.figure(figsize=(12, 6))
        labels, values = zip(*sorted(pos_counts.items(), key=lambda x: x[1], reverse=True))
        plt.bar(labels, values)
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig(os.path.join(self.output_dir, 'pos_distribution.png'))
        nouns = [w for w, t in self.pos_tags if t=='NOUN']
        verbs = [w for w, t in self.pos_tags if t=='VERB']
        adjs = [w for w, t in self.pos_tags if t=='ADJ']
        with open(os.path.join(self.output_dir, 'common_words_by_pos.json'), 'w') as f:
            json.dump({
                'nouns': dict(Counter(nouns).most_common(30)),
                'verbs': dict(Counter(verbs).most_common(30)),
                'adjectives': dict(Counter(adjs).most_common(30))
            }, f, indent=2)
        return {'nouns': Counter(nouns).most_common(30), 'verbs': Counter(verbs).most_common(30), 'adjectives': Counter(adjs).most_common(30)}
    
    def train_word2vec(self, tokenized_sentences):
        """Train a Word2Vec model"""
        print("Training Word2Vec model...")
        if not tokenized_sentences:
            print("No tokenized sentences for Word2Vec")
            return None
        model = Word2Vec(sentences=tokenized_sentences, vector_size=100, window=5, min_count=5, workers=4, sg=1, epochs=10)
        model.save(os.path.join(self.output_dir, 'unity_word2vec.model'))
        print(f"Word2Vec model trained with {len(model.wv)} words")
        return model
    
    def visualize_embeddings(self, model, n_words=100):
        """Visualize embeddings with t-SNE"""
        print("Visualizing word embeddings...")
        common = [w for w, _ in Counter(self.all_tokens).most_common(n_words) if w in model.wv]
        if len(common) < 2:
            print("Not enough words for t-SNE")
            return None
        vectors = np.array([model.wv[w] for w in common])
        tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(common)-1))
        coords = tsne.fit_transform(vectors)
        df = pd.DataFrame({'word': common, 'x': coords[:,0], 'y': coords[:,1]})
        plt.figure(figsize=(16,10))
        sns.scatterplot(data=df, x='x', y='y')
        for i, row in df.iterrows():
            plt.annotate(row['word'], (row['x'], row['y']), fontsize=8)
        plt.tight_layout()
        plt.savefig(os.path.join(self.output_dir, 'word_embeddings_tsne.png'))
        return df
    
    def find_similar_words(self, model, words, n=10):
        """Find similar words using Word2Vec"""
        print("Finding similar words...")
        results = {}
        for w in words:
            if w in model.wv:
                results[w] = model.wv.most_similar(w, topn=n)
        with open(os.path.join(self.output_dir, 'similar_words.json'), 'w') as f:
            json.dump(results, f, indent=2)
        return results

    def process_pipeline(self):
        """Run full NLP pipeline and return processed dataset"""
        # Load and extract
        json_data = self.load_json_data()
        texts = self.extract_text_from_json(json_data)
        if not texts:
            print("No text extracted. Aborting.")
            return None
        # Preprocess
        processed_docs, _ = self.preprocess_text(texts)
        # Tokenize & POS
        tokenized = self.tokenize_and_remove_stopwords()
        self.perform_pos_tagging()
        self.analyze_pos_distribution()
        # Word2Vec
        model = None
        if tokenized:
            model = self.train_word2vec(tokenized)
            if model:
                self.visualize_embeddings(model)
                self.find_similar_words(model, [
                    'gameobject','transform','component','rigidbody','collider',
                    'vector','quaternion','material','texture','mesh','animation',
                    'script','monobehaviour','instantiate','destroy','update'
                ])
        print("NLP processing pipeline complete")
        # Create a DataFrame of processed documents
        df = pd.DataFrame({'processed_text': self.processed_docs})
        # Save processed dataset
        output_csv = os.path.join(self.output_dir, 'processed_dataset.csv')
        df.to_csv(output_csv, index=False)
        print(f"Processed dataset saved to {output_csv}")
        return df

# Usage example

In [18]:
if __name__ == "__main__":
    input_dir = "./unity_json_data"
    output_dir = "./unity_nlp_results"
    processor = UnityNLPProcessor(input_dir, output_dir)
    processed_df = processor.process_pipeline()
    if isinstance(processed_df, pd.DataFrame):
        print(processed_df.head())

Loading JSON files from ./unity_json_data...



[A

100%|██████████| 21/21 [00:00<00:00, 264.29it/s]


Loaded 1000 JSON documents
Extracted text from 1000 documents
Preprocessing text...



[A
[A
[A
[A
100%|██████████| 1000/1000 [00:01<00:00, 988.84it/s]


Preprocessed 1000 documents into 15564 sentences
Tokenizing and removing stopwords...



[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
100%|██████████| 15564/15564 [00:03<00:00, 4244.08it/s]

Generated 15528 tokenized sentences
Performing POS tagging...





NameError: name 'nlp' is not defined

In [10]:
import pandas as pd
import re

# 1. Load your processed dataset
df = pd.read_csv("unity_nlp_results\processed_dataset.csv")  # ← adjust filename/path as needed

# 2. Function to remove non-ASCII (and non-English) chars
def clean_text(s: str) -> str:
    if not isinstance(s, str):
        return s
    # drop anything outside the basic ASCII range:
    ascii_only = s.encode("ascii", errors="ignore").decode("ascii")
    # optionally further restrict to letters/numbers/punctuation/whitespace:
    return re.sub(r"[^A-Za-z0-9\s\!\?\,\.\-\'\"]+", "", ascii_only)

# 3. Apply to every object (string) column
for col in df.select_dtypes(include=["object"]).columns:
    df[col] = df[col].apply(clean_text)

# 4. Save out a cleaned CSV
df.to_csv("unity_nlp_results\processed_dataset.csv", index=False)
print(f"Cleaned dataset written to results/your_results_clean.csv")


  df = pd.read_csv("unity_nlp_results\processed_dataset.csv")  # ← adjust filename/path as needed
  df.to_csv("unity_nlp_results\processed_dataset.csv", index=False)


Cleaned dataset written to results/your_results_clean.csv


In [5]:
from huggingface_hub import login
login(token=" ")

In [6]:
import chromadb
import torch
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM

In [7]:
chroma_client =chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection("unity_docs")

In [8]:
if torch.cuda.is_available():
  device = torch.device("cuda")
  print("cuda")
elif torch.backends.mps.is_available():
  device = torch.device("mps")
else:
  device = torch.device("cpu")
  print("cpu")

cuda


In [9]:
llama_model_name="meta-llama/Llama-3.2-1B"
tokenizer = AutoTokenizer.from_pretrained(llama_model_name)
model = AutoModelForCausalLM.from_pretrained(llama_model_name).to(device)

In [None]:
import numpy as np

In [13]:
df = pd.read_csv("unity_nlp_results/processed_dataset.csv", encoding="utf-8")
texts = df["processed_text"].astype(str).tolist()
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = embedding_model.encode(texts)
embeddings = np.array(embeddings, dtype=np.float32)

In [None]:
documents = texts 
ids = [f"doc_{i}" for i in range(len(documents))]

In [None]:
collection.add(
    ids=ids,
    documents=documents,
    embeddings=embeddings  # this stays as float32 vectors
)

In [14]:
results = collection.query(
    query_texts=["template"],
    n_results=5
)

for i, doc in enumerate(results['documents'][0]):
    print(f"\nResult {i+1}:\n{doc}")


Result 1:
manual scripting api unity.com unity - manual introduction to scenes unity manual introduction to scenes scene templates the new scene dialog creating a new scene pinning templates locating and editing templates multi-scene editing

Result 2:
enhance your unity projects with our guide on 11 programming patterns. download the sample project and start coding better today! level up your code with design patterns and solid e-book  unity

Result 3:
this post documents how to use the features of the post composer and markdown used by unity discussions. use the table of contents in the top right of the post to navigate the sections. this document is not exhaus powered bydiscourse, best viewed with javascript enabled ask questions, provide feedback, or discuss unity s web resources, including the unity asset store, unity discussions, unity documentation, unity learn, and unity-play. web resources - unity discussions web resources

Result 4:
this post documents how to use the feature

In [15]:
# 1. Define ChromaDB retrieval functions
def retrieve_docs_chromadb(query, top_k=3):
    """
    Retrieve documents from ChromaDB based on the query.
    """
    results = collection.query(
        query_texts=[query],
        n_results=top_k
    )
    retrieved_docs = results['documents'][0] if 'documents' in results and results['documents'] else []
    print("\n🔍 Retrieved Documents:\n", retrieved_docs)
    return retrieved_docs

# 2. Define RAG-based answer generation using LLaMA
def generate_answer(query):
    """
    Generate an answer using the LLaMA model and retrieved documents.
    """
    retrieved_docs = retrieve_docs_chromadb(query, top_k=3)

    if not retrieved_docs:
        return "No relevant information found."

    context = "\n".join(retrieved_docs)
    prompt = f"""
    You are a helpful assistant answering questions about Unity's documentation.
    Context:
    {context}
    Question: {query}
    Answer:
    """

    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    output = model.generate(**inputs, max_new_tokens=150)

    return tokenizer.decode(output[0], skip_special_tokens=True)



In [16]:
import gradio as gr


In [17]:
# Define CoT (Chain-of-Thought) reasoning
def generate_cot_answer(query):
    """
    Generate an answer using Chain-of-Thought reasoning.
    """
    retrieved_docs = retrieve_docs_chromadb(query, top_k=3)

    if not retrieved_docs:
        return "No relevant information found."

    context = "\n".join(retrieved_docs)
    cot_prompt = f"""
    You are an expert in Unity game development. Break down the reasoning step-by-step to answer the following query:
    Context:
    {context}
    Question: {query}
    Answer:
    """

    inputs = tokenizer(cot_prompt, return_tensors="pt").to(device)
    output = model.generate(**inputs, max_new_tokens=300)

    return tokenizer.decode(output[0], skip_special_tokens=True)

# Define ToT (Tree-of-Thought) reasoning
def generate_tot_answer(query, depth=2):
    """
    Generate an answer using Tree-of-Thought reasoning.
    """
    def recursive_reasoning(subquery, current_depth):
        if current_depth > depth:
            return []

        retrieved_docs = retrieve_docs_chromadb(subquery, top_k=3)
        if not retrieved_docs:
            return [f"No relevant information found for: {subquery}"]

        context = "\n".join(retrieved_docs)
        tot_prompt = f"""
        You are an expert in Unity game development. Explore multiple reasoning paths to answer the following query:
        Context:
        {context}
        Question: {subquery}
        Answer:
        """

        inputs = tokenizer(tot_prompt, return_tensors="pt").to(device)
        output = model.generate(**inputs, max_new_tokens=300)
        answer = tokenizer.decode(output[0], skip_special_tokens=True)

        # Recursively explore sub-questions
        subquestions = [f"Sub-question {i+1}: {doc}" for i, doc in enumerate(retrieved_docs)]
        subanswers = []
        for subq in subquestions:
            subanswers.extend(recursive_reasoning(subq, current_depth + 1))

        return [answer] + subanswers

    return "\n".join(recursive_reasoning(query, 1))

# Define GoT (Graph-of-Thought) reasoning
def generate_got_answer(query):
    """
    Generate an answer using Graph-of-Thought reasoning.
    """
    retrieved_docs = retrieve_docs_chromadb(query, top_k=5)

    if not retrieved_docs:
        return "No relevant information found."

    context = "\n".join(retrieved_docs)
    got_prompt = f"""
    You are an expert in Unity game development. Explore interconnected ideas and relationships to answer the following query:
    Context:
    {context}
    Question: {query}
    Answer:
    """

    inputs = tokenizer(got_prompt, return_tensors="pt").to(device)
    output = model.generate(**inputs, max_new_tokens=500)

    return tokenizer.decode(output[0], skip_special_tokens=True)

# Extend Gradio interface to include CoT, ToT, and GoT
def chat_interface_with_reasoning(user_input, reasoning_type="CoT"):
    """
    Chat interface function with reasoning options.
    """
    if reasoning_type == "CoT":
        return generate_cot_answer(user_input)
    elif reasoning_type == "ToT":
        return generate_tot_answer(user_input)
    elif reasoning_type == "GoT":
        return generate_got_answer(user_input)
    else:
        return "Invalid reasoning type. Please choose CoT, ToT, or GoT."

gr.Interface(
    fn=chat_interface_with_reasoning,
    inputs=[
        gr.Textbox(label="Ask your Unity question"),
        gr.Radio(choices=["CoT", "ToT", "GoT"], label="Reasoning Type", value="CoT")
    ],
    outputs=gr.Textbox(label="AI Response"),
    title="🎮 Unity Assistant Chatbot with Reasoning",
    description="Ask anything about Unity development or documentation! Choose a reasoning type: CoT, ToT, or GoT.",
    theme="default"
).launch(share=True)

* Running on local URL:  http://127.0.0.1:7861
* Running on public URL: https://027440a44a0d114c67.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)





🔍 Retrieved Documents:
 ['manual scripting api unity.com unity - manual introduction to collision unity manual introduction to collision collider types trigger colliders collider shapes collider surfaces', 'manual scripting api unity.com unity - manual introduction to rigidbody 2d unity manual introduction to rigidbody 2d how a rigidbody 2d works collider 2d and rigidbody 2d interaction additional resources', 'manual scripting api unity.com unity - manual physics unity manual physics built-in physics engines for object-oriented projects physics engine packages for data-oriented projects additional information resources', 'hi everyone, we just created an updated 7-part video tutorial series on the input system full of tips on how to make the most of various use cases. we cover everything you ll need to get up and running with the input sy hey everyone, a new e-book just dropped for those of you using or planning to use urp in your projects. read on to get key details about the e-book, 

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.



🔍 Retrieved Documents:
 ['manual scripting api unity.com unity - manual introduction to collision unity manual introduction to collision collider types trigger colliders collider shapes collider surfaces', 'manual scripting api unity.com unity - manual introduction to rigidbody 2d unity manual introduction to rigidbody 2d how a rigidbody 2d works collider 2d and rigidbody 2d interaction additional resources', 'manual scripting api unity.com unity - manual physics unity manual physics built-in physics engines for object-oriented projects physics engine packages for data-oriented projects additional information resources', 'hi everyone, we just created an updated 7-part video tutorial series on the input system full of tips on how to make the most of various use cases. we cover everything you ll need to get up and running with the input sy hey everyone, a new e-book just dropped for those of you using or planning to use urp in your projects. read on to get key details about the e-book, 

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
