In [None]:
# ## 1. Import Dependencies

import os
import re
import tiktoken
from typing import List, Dict, Any, Optional, Tuple
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
# ## 2. Define Constants and Patterns

# Configure constants
DATA_DIR = "../data"
DEFAULT_ENCODING = "utf-8"
MAX_TOKENS = 500
MIN_TOKENS = 300
TOKENIZER_MODEL = "cl100k_base"

# Compile regex patterns once for efficiency
TITLE_PATTERN = re.compile(r"Title: (.+)")
URL_PATTERN = re.compile(r"URL Source: (.+)")
CONTENT_PATTERN = re.compile(r"Markdown Content:(.+)", re.DOTALL)
SUBTOPIC_PATTERN = re.compile(r"^(.*)\n-+\n", re.MULTILINE)
DIALOGUE_PATTERN = re.compile(r"(?P<speaker>\w+)\s\[\((?P<timestamp>\d{2}:\d{2}:\d{2})\)\]\((?P<url>https:\/\/youtube\.com\/watch\?v=[^&]+&t=\d+)\)\s(?P<text>.+)")


In [None]:
# ## 3. File Loading Functions

def load_transcripts(data_dir=DATA_DIR):
    """
    Load all transcript files from the data directory.
    
    Args:
        data_dir: Directory containing transcript files
        
    Returns:
        A list of transcript strings
    """
    transcripts = []
    try:
        for file_name in os.listdir(data_dir):
            if file_name.endswith(".txt"):
                try:
                    with open(os.path.join(data_dir, file_name), 'r', encoding=DEFAULT_ENCODING) as f:
                        transcripts.append(f.read())
                    print(f"Loaded: {file_name}")
                except (IOError, UnicodeDecodeError) as e:
                    print(f"Error reading file {file_name}: {e}")
        return transcripts
    except FileNotFoundError:
        print(f"Data directory not found: {data_dir}")
        return []

# Let's load our transcripts
transcripts = load_transcripts()
print(f"Loaded {len(transcripts)} transcript files")


In [None]:
# ## 4. Basic Transcript Parsing

def parse_transcript(transcript):
    """
    Parse a transcript to extract title, URL, and content.
    
    Args:
        transcript: Raw transcript text
        
    Returns:
        Dictionary with title, url, and content fields
    """
    title_match = TITLE_PATTERN.search(transcript)
    url_match = URL_PATTERN.search(transcript)
    content_match = CONTENT_PATTERN.search(transcript)

    result = {
        "title": title_match.group(1) if title_match else None,
        "url": url_match.group(1) if url_match else None,
        "content": content_match.group(1).strip() if content_match else None
    }
    
    # Check if all fields were successfully extracted
    missing_fields = [field for field, value in result.items() if value is None]
    if missing_fields:
        print(f"Warning: Missing fields in transcript: {', '.join(missing_fields)}")
    
    return result

# Test the parse_transcript function with sample data
if transcripts:
    sample_transcript = transcripts[0]
    parsed_sample = parse_transcript(sample_transcript)
    print("\nSample Transcript Info:")
    print(f"Title: {parsed_sample['title']}")
    print(f"URL: {parsed_sample['url']}")
    print(f"Content Length: {len(parsed_sample['content']) if parsed_sample['content'] else 0} characters")

In [None]:
# ## 5. Tokenization Utilities

# Initialize the tokenizer
tokenizer = tiktoken.get_encoding(TOKENIZER_MODEL)

def get_token_count(text):
    """
    Count tokens in a string.
    
    Args:
        text: Text to count tokens in
        
    Returns:
        Number of tokens
    """
    return len(tokenizer.encode(text))

def chunk_text(text, max_tokens=MAX_TOKENS, min_tokens=MIN_TOKENS):
    """
    Chunk text based on token count, preserving sentence structure where possible.
    
    Args:
        text: Text to chunk
        max_tokens: Maximum tokens per chunk
        min_tokens: Minimum tokens per chunk
        
    Returns:
        List of text chunks
    """
    tokens = tokenizer.encode(text)
    chunks = []
    current_chunk = []
    
    # Try to find natural break points like end of sentences
    sentences = re.split(r'(?<=[.!?])\s+', text)
    current_sentence_tokens = []
    current_chunk_tokens = []
    
    for sentence in sentences:
        sentence_tokens = tokenizer.encode(sentence)
        
        # If a single sentence exceeds max_tokens, we'll need to break it up
        if len(sentence_tokens) > max_tokens:
            # Process the oversized sentence word by word
            words = sentence.split()
            current_word_tokens = []
            
            for word in words:
                word_tokens = tokenizer.encode(word + " ")
                if len(current_chunk_tokens) + len(word_tokens) <= max_tokens:
                    current_chunk_tokens.extend(word_tokens)
                    current_word_tokens.extend(word_tokens)
                else:
                    # Save current chunk and start a new one
                    chunks.append(tokenizer.decode(current_chunk_tokens))
                    current_chunk_tokens = word_tokens
                    current_word_tokens = word_tokens
            
            # Add any remaining tokens from the oversized sentence
            if current_word_tokens:
                if len(current_chunk_tokens) < min_tokens and chunks:
                    # Combine with previous chunk if this one is too small
                    previous_chunk = chunks.pop()
                    previous_tokens = tokenizer.encode(previous_chunk)
                    combined = previous_tokens + current_chunk_tokens
                    chunks.append(tokenizer.decode(combined))
                else:
                    chunks.append(tokenizer.decode(current_chunk_tokens))
                current_chunk_tokens = []
                
        # Normal case: sentence fits within max_tokens
        elif len(current_chunk_tokens) + len(sentence_tokens) <= max_tokens:
            current_chunk_tokens.extend(sentence_tokens)
        else:
            # Save current chunk and start new one with this sentence
            chunks.append(tokenizer.decode(current_chunk_tokens))
            current_chunk_tokens = sentence_tokens
    
    # Handle any remaining content
    if current_chunk_tokens:
        if len(current_chunk_tokens) < min_tokens and chunks:
            # Combine with previous chunk if this one is too small
            previous_chunk = chunks.pop()
            previous_tokens = tokenizer.encode(previous_chunk)
            combined = previous_tokens + current_chunk_tokens
            chunks.append(tokenizer.decode(combined))
        else:
            chunks.append(tokenizer.decode(current_chunk_tokens))
    
    return chunks

# Test the tokenizer on a small sample
if parsed_sample and parsed_sample['content']:
    sample_text = parsed_sample['content'][:1000]  # First 1000 chars
    token_count = get_token_count(sample_text)
    print(f"\nSample text token count: {token_count}")
    
    # Test chunking
    chunks = chunk_text(sample_text, max_tokens=200, min_tokens=100)
    print(f"Split into {len(chunks)} chunks")
    for i, chunk in enumerate(chunks):
        print(f"Chunk {i+1}: {get_token_count(chunk)} tokens")

In [None]:
# ## 6. Dialogue Extraction Functions

def extract_dialogues(content_block):
    """
    Extract dialogues from a content block.
    
    Args:
        content_block: A string containing dialogue text
        
    Returns:
        Tuple of (dialogues, speakers, timestamp)
    """
    dialogues = []
    speakers = []
    timestamp = None
    
    matches = DIALOGUE_PATTERN.findall(content_block)
    
    for match in matches:
        speaker, time, url, text = match
        if timestamp is None:
            timestamp = f"[({time})]({url})"
        
        if speaker not in speakers:
            speakers.append(speaker)
            
        dialogues.append({
            "speaker": speaker,
            "text": text,
            "timestamp": time,
            "url": url
        })
        
    return dialogues, speakers, timestamp

def format_dialogues(dialogues):
    """
    Format a list of dialogue dictionaries into strings.
    
    Args:
        dialogues: List of dialogue dictionaries
        
    Returns:
        List of formatted dialogue strings
    """
    return [f"{d['speaker']}: {d['text']} \n" for d in dialogues]

In [None]:
# ## 7. Subtopic Processing

def parse_transcript_by_subtopic(data):
    """
    Parse transcript content into subtopic chunks.
    
    Args:
        data: Dictionary with transcript data
        
    Returns:
        List of subtopic chunks
    """
    transcript = data.get("content")
    if not transcript:
        print("Warning: No content found in transcript data")
        return []
    
    chunks = []
    subtopics = SUBTOPIC_PATTERN.split(transcript)
    
    subtopic_count = (len(subtopics) - 1) // 2
    print(f"Found {subtopic_count} subtopics in transcript")
    
    for i in range(1, len(subtopics), 2):
        subtopic = subtopics[i].strip()
        content_block = subtopics[i + 1] if i + 1 < len(subtopics) else ""
        
        dialogues, speakers, timestamp = extract_dialogues(content_block)
        formatted_text = format_dialogues(dialogues)
        
        # Skip empty subtopics
        if not formatted_text:
            print(f"Warning: Empty content in subtopic '{subtopic}'")
            continue
        
        chunks.append({
            "subtopic": subtopic,
            "content": formatted_text,
            "metadata": {
                "speakers": speakers,
                "dialogue_count": len(dialogues),
                "title": data.get("title"),
                "url": data.get("url"),
                "timestamp": timestamp
            }
        })
        
    return chunks

# Let's test parsing by subtopic
if parsed_sample and parsed_sample['content']:
    subtopic_chunks = parse_transcript_by_subtopic(parsed_sample)
    print(f"\nExtracted {len(subtopic_chunks)} subtopic chunks")
    
    # Display the first few subtopics
    for i, chunk in enumerate(subtopic_chunks[:3]):  # Show first 3
        print(f"\nSubtopic {i+1}: {chunk['subtopic']}")
        print(f"Speakers: {', '.join(chunk['metadata']['speakers'])}")
        print(f"Dialogue count: {chunk['metadata']['dialogue_count']}")
        # Print a sample of the content (first 2 dialogues)
        if chunk['content']:
            print("Sample content:")
            for dialogue in chunk['content'][:2]:
                print(f"  {dialogue.strip()}")

In [None]:
# ## 8. Token-Based Chunking

def parse_and_chunk_transcript_by_subtopic(data, max_tokens=MAX_TOKENS, min_tokens=MIN_TOKENS):
    """
    Parse transcript and create token-sized chunks within subtopics.
    
    Args:
        data: Dictionary with transcript data
        max_tokens: Maximum tokens per chunk
        min_tokens: Minimum tokens per chunk
        
    Returns:
        List of chunked subtopics
    """
    transcript = data.get("content")
    if not transcript:
        print("Warning: No content found in transcript data")
        return []
    
    chunks = []
    subtopics = SUBTOPIC_PATTERN.split(transcript)
    
    subtopic_count = (len(subtopics) - 1) // 2
    print(f"Processing {subtopic_count} subtopics with token limits ({min_tokens}-{max_tokens})")
    
    total_dialogues = 0
    chunked_subtopics = 0
    
    for i in range(1, len(subtopics), 2):
        subtopic = subtopics[i].strip()
        content_block = subtopics[i + 1] if i + 1 < len(subtopics) else ""
        
        dialogues, speakers, timestamp = extract_dialogues(content_block)
        formatted_text = format_dialogues(dialogues)
        
        total_dialogues += len(dialogues)
        
        # Skip empty subtopics
        if not formatted_text:
            continue
        
        # Check if we need to chunk based on token count
        combined_text = ''.join(formatted_text)
        token_count = get_token_count(combined_text)
        
        if token_count > max_tokens:
            chunked_subtopics += 1
            # Use sentence-aware chunking to preserve dialogue context
            token_chunks = chunk_text(combined_text, max_tokens, min_tokens)
            for chunk_idx, chunk in enumerate(token_chunks):
                chunks.append({
                    "subtopic": f"{subtopic} (part {chunk_idx+1}/{len(token_chunks)})",
                    "content": chunk,  # A single string
                    "metadata": {
                        "speakers": speakers,
                        "token_count": get_token_count(chunk),
                        "title": data.get("title"),
                        "url": data.get("url"),
                        "timestamp": timestamp,
                        "is_chunked": True,
                        "original_subtopic": subtopic
                    }
                })
        else:
            chunks.append({
                "subtopic": subtopic,
                "content": formatted_text,  # A list of strings
                "metadata": {
                    "speakers": speakers,
                    "dialogue_count": len(dialogues),
                    "token_count": token_count,
                    "title": data.get("title"),
                    "url": data.get("url"),
                    "timestamp": timestamp,
                    "is_chunked": False
                }
            })
    
    print(f"Processed {total_dialogues} dialogues, chunked {chunked_subtopics} subtopics")
    return chunks

# Test parsing and chunking with a sample
if parsed_sample and parsed_sample['content']:
    chunked_results = parse_and_chunk_transcript_by_subtopic(parsed_sample)
    print(f"\nCreated {len(chunked_results)} chunks after token-based chunking")
    
    # Calculate statistics
    total_tokens = sum(chunk['metadata'].get('token_count', 0) for chunk in chunked_results)
    avg_tokens = total_tokens / len(chunked_results) if chunked_results else 0
    
    print(f"Total tokens: {total_tokens}")
    print(f"Average tokens per chunk: {avg_tokens:.1f}")
    
    # Count chunked vs non-chunked subtopics
    chunked = sum(1 for chunk in chunked_results if chunk['metadata'].get('is_chunked', False))
    non_chunked = len(chunked_results) - chunked
    print(f"Chunked subtopics: {chunked}")
    print(f"Non-chunked subtopics: {non_chunked}")

In [None]:
# ## 9. Token Count Analysis

def get_token_counts_by_subtopic(subtopics):
    """
    Get token counts for each subtopic.
    
    Args:
        subtopics: List of subtopic dictionaries
        
    Returns:
        List of dictionaries with subtopic names and token counts
    """
    token_counts = []
    for subtopic in subtopics:
        content = subtopic['content']
        
        # Handle both string and list content formats
        if isinstance(content, list):
            content_text = ''.join(content)
        else:
            content_text = content
            
        token_count = get_token_count(content_text)
        
        token_counts.append({
            'subtopic': subtopic['subtopic'],
            'token_count': token_count
        })
        
    return token_counts

# Analyze token counts for our sample
if 'chunked_results' in locals() and chunked_results:
    token_counts = get_token_counts_by_subtopic(chunked_results)
    
    # Convert to DataFrame for easier analysis
    df_tokens = pd.DataFrame(token_counts)
    
    print("\nToken count statistics:")
    print(f"Min: {df_tokens['token_count'].min()}")
    print(f"Max: {df_tokens['token_count'].max()}")
    print(f"Mean: {df_tokens['token_count'].mean():.1f}")
    print(f"Median: {df_tokens['token_count'].median()}")
    
    # Plot token distribution
    plt.figure(figsize=(10, 6))
    plt.hist(df_tokens['token_count'], bins=10, alpha=0.7)
    plt.axvline(x=MAX_TOKENS, color='r', linestyle='--', label=f'Max tokens limit ({MAX_TOKENS})')
    plt.xlabel('Token Count')
    plt.ylabel('Number of Chunks')
    plt.title('Token Count Distribution Across Chunks')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()


In [None]:
# ## 10. Process All Transcripts

def process_all_transcripts(transcript_list, chunk_by_tokens=True):
    """
    Process all transcripts in the provided list.
    
    Args:
        transcript_list: List of transcript strings
        chunk_by_tokens: Whether to chunk by token count
        
    Returns:
        List of processed transcript chunks
    """
    all_chunks = []
    
    for i, transcript in enumerate(transcript_list):
        try:
            print(f"\nProcessing transcript {i+1}/{len(transcript_list)}")
            parsed = parse_transcript(transcript)
            
            if chunk_by_tokens:
                chunks = parse_and_chunk_transcript_by_subtopic(parsed)
            else:
                chunks = parse_transcript_by_subtopic(parsed)
                
            all_chunks.extend(chunks)
            print(f"Added {len(chunks)} chunks")
        except Exception as e:
            print(f"Error processing transcript {i+1}: {e}")
                
    return all_chunks

# Process all our transcripts
all_processed_chunks = process_all_transcripts(transcripts)

print(f"\nTotal processed chunks across all transcripts: {len(all_processed_chunks)}")


In [None]:
# ## 11. Analyzing Speaker Distributions

if all_processed_chunks:
    # Collect all speakers
    all_speakers = set()
    for chunk in all_processed_chunks:
        all_speakers.update(chunk['metadata']['speakers'])
    
    print(f"\nTotal unique speakers across all transcripts: {len(all_speakers)}")
    print(f"Speakers: {', '.join(sorted(all_speakers))}")
    
    # Count speaker occurrences
    speaker_count = {}
    for chunk in all_processed_chunks:
        for speaker in chunk['metadata']['speakers']:
            speaker_count[speaker] = speaker_count.get(speaker, 0) + 1
    
    # Plot speaker distribution
    speakers_df = pd.DataFrame(list(speaker_count.items()), columns=['Speaker', 'Chunk Count'])
    speakers_df = speakers_df.sort_values('Chunk Count', ascending=False)
    
    plt.figure(figsize=(12, 6))
    plt.bar(speakers_df['Speaker'], speakers_df['Chunk Count'], color='skyblue')
    plt.xlabel('Speaker')
    plt.ylabel('Number of Chunks Appearing In')
    plt.title('Speaker Participation Across All Transcripts')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()


In [None]:
# ## 12. Subtopic Analysis

if all_processed_chunks:
    # Analyze subtopics
    subtopics = [chunk['subtopic'] for chunk in all_processed_chunks]
    subtopic_counts = pd.Series(subtopics).value_counts()
    
    print(f"\nTotal unique subtopics: {len(subtopic_counts)}")
    
    # Show top subtopics
    print("\nTop 10 subtopics by frequency:")
    print(subtopic_counts.head(10))
    
    # Plot subtopic distribution (top 15)
    plt.figure(figsize=(14, 8))
    subtopic_counts.head(15).plot(kind='barh', color='lightgreen')
    plt.xlabel('Count')
    plt.ylabel('Subtopic')
    plt.title('Most Common Subtopics')
    plt.tight_layout()
    plt.show()

In [None]:
# ## 13. Save Processed Data

import json

def save_processed_data(chunks, output_file="processed_transcripts.json"):
    """
    Save processed chunks to a JSON file.
    
    Args:
        chunks: List of processed transcript chunks
        output_file: Filename to save to
    """
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(chunks, f, indent=2, ensure_ascii=False)
        print(f"\nSaved {len(chunks)} processed chunks to {output_file}")
    except Exception as e:
        print(f"Error saving data: {e}")

# Save our processed data
if all_processed_chunks:
    save_processed_data(all_processed_chunks)


In [None]:
# ## 14. Summary and Next Steps

print("\n## Summary ##")
print(f"- Processed {len(transcripts)} transcript files")
print(f"- Created {len(all_processed_chunks)} content chunks")
print(f"- Identified {len(all_speakers)} unique speakers")

print("\n## Next Steps ##")
print("1. Use the processed chunks for further analysis")
print("2. Consider implementing topic modeling to identify key themes")
print("3. Perform sentiment analysis on the dialogue")
print("4. Create speaker interaction networks")