In [None]:
# Modified Cell 1: Dynamic resource allocation for Dask Client
import os
import json
import numpy as np
import pandas as pd
import psutil
from tqdm.auto import tqdm
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import dask.bag as db
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import pipeline

# Dynamically determine system resources
def get_system_resources():
    # Get available memory (in GB)
    total_memory = psutil.virtual_memory().total / (1024**3)
    # Get CPU count
    cpu_count = psutil.cpu_count(logical=False)  # Physical cores only
    if not cpu_count:
        cpu_count = psutil.cpu_count(logical=True)  # Logical if physical not available
    
    # Use 70% of available memory for Dask, split across workers
    dask_memory = int(total_memory * 0.7)
    # Determine optimal worker count (leave at least 1 core for system)
    worker_count = max(1, cpu_count - 1)
    # Memory per worker
    memory_per_worker = int(dask_memory / worker_count)
    
    return {
        'worker_count': worker_count,
        'memory_per_worker': memory_per_worker,
        'total_memory': total_memory
    }

# Get system resources
resources = get_system_resources()
print(f"System has {resources['total_memory']:.1f}GB memory and {resources['worker_count']} CPU cores")
print(f"Allocating {resources['worker_count']} workers with {resources['memory_per_worker']}GB each")

# Start a local Dask cluster with dynamically determined resources
cluster = LocalCluster(
    n_workers=resources['worker_count'],
    threads_per_worker=2,
    memory_limit=f"{resources['memory_per_worker']}GB"
)
client = Client(cluster)
print(f"Dashboard link: {client.dashboard_link}")
client

In [None]:
# Cell 2: Load Theme Dictionary & Optimize Theme Embeddings
# Load per-game theme keywords
with open('game_themes.json', 'r') as f:
    raw = json.load(f)
GAME_THEMES = {int(appid): themes for appid, themes in raw.items()}

# Initialize SBERT embedder
embedder = SentenceTransformer('all-MiniLM-L6-v2')

# Function to get theme embeddings for specific app IDs
# This avoids loading all embeddings at once
def get_theme_embeddings(app_ids):
    """Get theme embeddings for a specific set of app IDs"""
    embeddings = {}
    for appid in app_ids:
        if appid not in embeddings and appid in GAME_THEMES:
            emb_list = []
            for theme, seeds in GAME_THEMES[appid].items():
                seed_emb = embedder.encode(seeds, convert_to_numpy=True)
                emb_list.append(seed_emb.mean(axis=0))
            embeddings[appid] = np.vstack(emb_list)
    return embeddings# Cell 2: Load Theme Dictionary & Optimize Theme Embeddings
# Load per-game theme keywords
with open('game_themes.json', 'r') as f:
    raw = json.load(f)
GAME_THEMES = {int(appid): themes for appid, themes in raw.items()}

# Initialize SBERT embedder
embedder = SentenceTransformer('all-MiniLM-L6-v2')

# Function to get theme embeddings for specific app IDs
# This avoids loading all embeddings at once
def get_theme_embeddings(app_ids):
    """Get theme embeddings for a specific set of app IDs"""
    embeddings = {}
    for appid in app_ids:
        if appid not in embeddings and appid in GAME_THEMES:
            emb_list = []
            for theme, seeds in GAME_THEMES[appid].items():
                seed_emb = embedder.encode(seeds, convert_to_numpy=True)
                emb_list.append(seed_emb.mean(axis=0))
            embeddings[appid] = np.vstack(emb_list)
    return embeddings

In [None]:
# Modified Cell 3: Dynamic blocksize for reading Parquet Files
# Estimate dataset size first
def estimate_dataset_size(path):
    import os
    total_size = 0
    for file in os.listdir(path):
        if file.endswith('.parquet'):
            file_path = os.path.join(path, file)
            total_size += os.path.getsize(file_path)
    return total_size / (1024**3)  # Convert to GB

# Estimate dataset size
dataset_path = 'parquet_output_theme_combo'
estimated_size = estimate_dataset_size(dataset_path)
print(f"Estimated dataset size: {estimated_size:.2f}GB")

# Dynamically determine blocksize based on dataset and memory
# Use smaller blocks for larger datasets to prevent memory issues
if estimated_size > 100:  # Very large dataset
    blocksize = '16MB'
elif estimated_size > 10:  # Medium-large dataset
    blocksize = '32MB'
else:  # Smaller dataset
    blocksize = '64MB'

print(f"Using dynamic blocksize: {blocksize}")

# Read with dynamic blocksize
ddf = dd.read_parquet(
    f'{dataset_path}/*.parquet',
    columns=['steam_appid', 'review', 'review_language', 'voted_up'],
    blocksize=blocksize
)

In [None]:
# Cell 4: Filter & Clean Data
# Keep only English reviews and drop missing text
ddf = ddf[ddf['review_language'] == 'english']
ddf = ddf.dropna(subset=['review'])

In [None]:
# Cell 5: Optimized Partition-wise Topic Assignment
def assign_topic(df_partition):
    """Assign topics using only theme embeddings for app IDs in this partition"""
    # If no rows, return as-is
    if df_partition.empty:
        df_partition['topic_id'] = []
        return df_partition
    
    # Get unique app IDs in this partition
    app_ids = df_partition['steam_appid'].unique().tolist()
    app_ids = [int(appid) for appid in app_ids]
    
    # Get embeddings only for app IDs in this partition
    local_theme_embeddings = get_theme_embeddings(app_ids)
    
    reviews = df_partition['review'].tolist()
    # Compute embeddings in one go with batching
    review_embeds = embedder.encode(reviews, convert_to_numpy=True, batch_size=64)
    
    # Assign each review to its game-specific theme
    topic_ids = []
    for idx, appid in enumerate(df_partition['steam_appid']):
        appid = int(appid)
        if appid in local_theme_embeddings:
            theme_embs = local_theme_embeddings[appid]
            sims = cosine_similarity(review_embeds[idx:idx+1], theme_embs)
            topic_ids.append(int(sims.argmax()))
        else:
            # Default topic if theme embeddings not available
            topic_ids.append(0)
    
    df_partition['topic_id'] = topic_ids
    return df_partition

# Apply to each partition; specify output metadata
meta = ddf._meta.assign(topic_id=np.int64())
ddf_with_topic = ddf.map_partitions(assign_topic, meta=meta)

In [None]:
# Modified Cell 6: Dynamic batch sizing for aggregation
# Get unique app IDs
unique_app_ids = ddf['steam_appid'].unique().compute()
total_app_ids = len(unique_app_ids)

# Dynamically determine batch size based on number of app IDs and memory
# For larger datasets, use smaller batches to avoid memory issues
if total_app_ids > 1000:  # Very large number of app IDs
    batch_size = 3
elif total_app_ids > 500:  # Medium-large number
    batch_size = 5
elif total_app_ids > 100:  # Medium number
    batch_size = 10
else:  # Smaller number
    batch_size = 20

print(f"Processing {total_app_ids} unique app IDs with batch size {batch_size}")

# Initialize empty dataframes for results
all_agg_dfs = []
all_review_dfs = []

# Process in dynamically sized batches
for i in tqdm(range(0, len(unique_app_ids), batch_size)):
    batch_app_ids = unique_app_ids[i:i+batch_size]
    
    # Filter data for this batch of app IDs
    batch_ddf = ddf_with_topic[ddf_with_topic['steam_appid'].isin(batch_app_ids)]
    
    # Aggregate for this batch
    agg = batch_ddf.groupby(['steam_appid', 'topic_id']).agg(
        review_count=('review', 'count'),
        likes_sum=('voted_up', 'sum')
    )
    
    # Collect reviews for this batch
    reviews_series = batch_ddf.groupby(['steam_appid', 'topic_id'])['review'] \
        .apply(lambda x: list(x), meta=('review', object))
    
    # Compute both in parallel
    agg_df, reviews_df = dd.compute(agg, reviews_series)
    
    # Convert to DataFrames
    agg_df = agg_df.reset_index()
    reviews_df = reviews_df.reset_index().rename(columns={'review': 'Reviews'})
    
    # Append to results
    all_agg_dfs.append(agg_df)
    all_review_dfs.append(reviews_df)

In [None]:
# Cell 7: Construct Final Report DataFrame
# Merge counts, likes, and reviews
report_df = pd.merge(
    agg_df,
    reviews_df,
    on=['steam_appid', 'topic_id'],
    how='left'
)

# Build the final output structure
rows = []
for _, row in report_df.iterrows():
    appid = int(row['steam_appid'])
    tid = int(row['topic_id'])
    
    # Check if appid exists in GAME_THEMES
    if appid in GAME_THEMES:
        theme_keys = list(GAME_THEMES[appid].keys())
        # Check if tid is a valid index
        if tid < len(theme_keys):
            theme_name = theme_keys[tid]
        else:
            theme_name = f"Unknown Theme {tid}"
    else:
        theme_name = f"Unknown Theme {tid}"
    
    total = int(row['review_count'])
    likes = int(row['likes_sum'])
    like_ratio = f"{(likes / total * 100):.1f}%" if total > 0 else '0%'
    rows.append({
        'steam_appid': appid,
        'Theme': theme_name,
        '#Reviews': total,
        'LikeRatio': like_ratio,
        'Reviews': row['Reviews']
    })

final_report = pd.DataFrame(rows)

# Save intermediate results to avoid recomputation if summarization fails
final_report.to_csv('output_csvs/SBERT_DD_new_report.csv', index=False)

In [None]:
# Cell 8: View the Report
# Print preview of the DataFrame (excluding the Reviews column as it contains lists)
print("Final report preview (Reviews column contains lists of review texts):")
print(final_report[['steam_appid', 'Theme', '#Reviews', 'LikeRatio']].head())

# Verify that Reviews column contains lists
sample_reviews = final_report['Reviews'].iloc[0]
print(f"\nSample from first Reviews entry (showing first review only):")
if isinstance(sample_reviews, list) and len(sample_reviews) > 0:
    print(f"Number of reviews in list: {len(sample_reviews)}")
    print(f"First review (truncated): {sample_reviews[0][:100]}...")
client.close()

In [None]:
# Cell 9: Dynamically optimized GPU hierarchical summarization with Dask

import pandas as pd
import numpy as np
import torch
import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from tqdm.auto import tqdm
import time
import os
import psutil
import json
import threading

# 1. Dynamic resource allocation based on system capabilities
def get_system_resources():
    """Determine optimal system resource allocation"""
    # Get available memory and CPU resources
    total_memory = psutil.virtual_memory().total / (1024**3)  # GB
    available_memory = psutil.virtual_memory().available / (1024**3)  # GB
    cpu_count = psutil.cpu_count(logical=False) or psutil.cpu_count(logical=True)
    
    # Check for GPU presence and memory
    gpu_available = torch.cuda.is_available()
    gpu_count = torch.cuda.device_count() if gpu_available else 0
    gpu_memory = [torch.cuda.get_device_properties(i).total_memory / (1024**3) for i in range(gpu_count)] if gpu_available else []
    
    # Determine optimal worker count - leave cores for system and GPU processes
    if gpu_available:
        # For GPU workloads, fewer workers but more memory per worker
        worker_count = min(max(1, cpu_count // 2), gpu_count + 1)
    else:
        # For CPU workloads, use more workers
        worker_count = max(1, cpu_count - 1)
    
    # Memory per worker (70% of available to leave headroom)
    safe_memory = available_memory * 0.7
    memory_per_worker = safe_memory / worker_count
    
    # Dynamic chunk size based on available memory
    if memory_per_worker > 8:  # High memory
        chunk_size = 300
    elif memory_per_worker > 4:  # Medium memory
        chunk_size = 200
    else:  # Low memory
        chunk_size = 100
    
    print(f"System resources: {total_memory:.1f}GB total RAM, {available_memory:.1f}GB available")
    print(f"CPU cores: {cpu_count}, GPU count: {gpu_count}")
    if gpu_count > 0:
        for i, mem in enumerate(gpu_memory):
            print(f"GPU {i}: {mem:.1f}GB memory")
    
    return {
        'worker_count': worker_count,
        'memory_per_worker': memory_per_worker,
        'chunk_size': chunk_size,
        'gpu_available': gpu_available,
        'gpu_count': gpu_count,
        'gpu_memory': gpu_memory
    }

# Get system resources
resources = get_system_resources()

# Create checkpoint directory if it doesn't exist
os.makedirs('checkpoints', exist_ok=True)

# Start a local Dask cluster with dynamic resources
n_workers = resources['worker_count']
print(f"Starting Dask cluster with {n_workers} workers, {resources['memory_per_worker']:.1f}GB per worker")
cluster = LocalCluster(
    n_workers=n_workers, 
    threads_per_worker=2,
    memory_limit=f"{resources['memory_per_worker']:.1f}GB"
)
client = Client(cluster)
print(f"Dask dashboard available at: {client.dashboard_link}")

# 2. Determine model based on available resources
def select_model():
    """Select appropriate model based on available resources"""
    if resources['gpu_available'] and any(mem > 8 for mem in resources['gpu_memory']):
        # For high-end GPUs, use more powerful model
        return 'sshleifer/distilbart-cnn-12-6'
    elif resources['gpu_available']:
        # For lower-end GPUs, use smaller model
        return 'facebook/bart-large-cnn'
    else:
        # For CPU-only, use smallest model
        return 'facebook/bart-base'

# Select model based on resources
MODEL_NAME = select_model()
print(f"Selected model: {MODEL_NAME}")

# 3. First, load the data and check for existing checkpoints
def load_with_checkpoint():
    """Load data with checkpoint recovery"""
    checkpoint_file = 'checkpoints/summarization_progress.json'
    
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'r') as f:
            checkpoint = json.load(f)
            print(f"Found checkpoint with {len(checkpoint)} completed summaries")
            
        # Filter the dataframe to only process remaining rows
        completed_indices = list(map(int, checkpoint.keys()))
        remaining_df = final_report[~final_report.index.isin(completed_indices)].copy()
        
        print(f"Resuming processing for {len(remaining_df)} remaining items")
        return remaining_df, checkpoint
    else:
        print("No checkpoint found, processing all items")
        return final_report, {}

# Load data with checkpoint support
df_to_process, existing_summaries = load_with_checkpoint()

# 4. Prepare partitions with optimized distribution
@dask.delayed
def prepare_partition(start_idx, end_idx, df):
    """Prepare a partition without loading the entire DataFrame into each worker"""
    # Get just this partition
    return df.iloc[start_idx:end_idx].copy()

# Distribute the remaining work
partition_size = len(df_to_process) // n_workers
partitions = []
for i in range(n_workers):
    start_idx = i * partition_size
    end_idx = (i + 1) * partition_size if i < n_workers - 1 else len(df_to_process)
    partitions.append(prepare_partition(start_idx, end_idx, df_to_process))

# 5. Worker processing function with dynamic GPU batch sizing
@dask.delayed
def process_partition(partition_df, worker_id):
    """Process a partition with dynamic batch sizes and error recovery"""
    # Import packages needed in the worker
    from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
    import torch
    import gc
    
    # Determine optimal GPU batch size based on available memory
    def determine_gpu_batch_size():
        if not torch.cuda.is_available():
            return 8  # Conservative default for CPU
            
        try:
            # Get GPU memory info for this worker
            total_mem = torch.cuda.get_device_properties(0).total_memory / (1024**3)  # GB
            # Reserve 10% for system processes and overhead
            usable_mem = total_mem * 0.9
            
            # Scale batch size based on available GPU memory
            if usable_mem > 16:  # High-end GPU with >16GB
                return 64
            elif usable_mem > 8:  # Mid-range GPU with >8GB
                return 32
            elif usable_mem > 4:  # Lower-end GPU with >4GB
                return 16
            else:  # Minimal GPU
                return 8
        except Exception as e:
            print(f"Error determining GPU batch size: {e}")
            return 8  # Conservative fallback
    
    # Worker initialization with error handling
    try:
        # Load tokenizer first
        tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
        
        # Configure device placement based on available resources
        if torch.cuda.is_available():
            device_map = "auto"
            dtype = torch.float16  # Use half precision with GPU
        else:
            device_map = None
            dtype = torch.float32  # Use full precision with CPU
        
        # Load model with appropriate configuration
        model = AutoModelForSeq2SeqLM.from_pretrained(
            MODEL_NAME,
            torch_dtype=dtype,
            device_map=device_map,
            low_cpu_mem_usage=True
        )
        
        # Create pipeline with model AND tokenizer
        summarizer = pipeline(
            task='summarization',
            model=model,
            tokenizer=tokenizer,
            framework='pt',
            model_kwargs={"use_cache": True}
        )
        
        # Report worker status
        if torch.cuda.is_available():
            gpu_mem = torch.cuda.memory_allocated(0) / (1024**3)
            print(f"Worker {worker_id}: GPU Memory: {gpu_mem:.2f}GB allocated")
            MAX_GPU_BATCH_SIZE = determine_gpu_batch_size()
            print(f"Worker {worker_id}: Using GPU batch size: {MAX_GPU_BATCH_SIZE}")
        else:
            MAX_GPU_BATCH_SIZE = 8
            print(f"Worker {worker_id}: Using CPU with batch size: {MAX_GPU_BATCH_SIZE}")
    except Exception as e:
        print(f"Worker {worker_id} initialization error: {e}")
        # Fall back to a simpler configuration
        try:
            print(f"Falling back to CPU-only mode for worker {worker_id}")
            tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
            model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME)
            summarizer = pipeline("summarization", model=model, tokenizer=tokenizer)
            MAX_GPU_BATCH_SIZE = 4  # Conservative batch size for fallback mode
        except Exception as e2:
            print(f"Critical failure in worker {worker_id}: {e2}")
            return []  # Return empty results to avoid deadlock
    
    # Efficient batch processing function with memory management
    def process_chunks_batched(chunks):
        """Process chunks in batches with dynamic memory management"""
        all_summaries = []
        
        # Process in dynamically sized batches
        for i in range(0, len(chunks), MAX_GPU_BATCH_SIZE):
            try:
                batch = chunks[i:i+MAX_GPU_BATCH_SIZE]
                batch_summaries = summarizer(
                    batch,
                    max_length=60,
                    min_length=20,
                    truncation=True,
                    do_sample=False
                )
                all_summaries.extend([s["summary_text"] for s in batch_summaries])
                
                # Proactively manage memory
                if i % (MAX_GPU_BATCH_SIZE * 2) == 0 and torch.cuda.is_available():
                    torch.cuda.empty_cache()
                    
            except Exception as e:
                print(f"Error in batch {i//MAX_GPU_BATCH_SIZE} of worker {worker_id}: {e}")
                # Try smaller batch on failure
                if len(batch) > 1:
                    print("Retrying with smaller batches...")
                    for single_item in batch:
                        try:
                            summary = summarizer(
                                [single_item],
                                max_length=60,
                                min_length=20,
                                truncation=True,
                                do_sample=False
                            )
                            all_summaries.append(summary[0]["summary_text"])
                        except Exception as e2:
                            print(f"Failed to process single item: {e2}")
                            all_summaries.append("Error generating summary.")
                else:
                    all_summaries.append("Error generating summary.")
                
                # Clean up after errors
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
                gc.collect()
        
        return all_summaries
    
    # Hierarchical summary function with adaptive chunking
    def hierarchical_summary(reviews, base_chunk_size=200):
        """Create hierarchical summary with adaptive chunk sizing"""
        # Defense against empty or invalid reviews
        if not reviews or not isinstance(reviews, list):
            return "No reviews available for summarization."
        
        # If there are fewer than chunk_size, just do one summary
        if len(reviews) <= base_chunk_size:
            try:
                # Join reviews with clear separation
                doc = "\n\n".join(reviews[:base_chunk_size])
                return summarizer(
                    doc,
                    max_length=60,
                    min_length=20,
                    truncation=True,
                    do_sample=False
                )[0]['summary_text']
            except Exception as e:
                print(f"Error summarizing small batch: {e}")
                # Try with even smaller batch if original fails
                try:
                    half_size = len(reviews) // 2
                    doc = "\n\n".join(reviews[:half_size])
                    return summarizer(
                        doc,
                        max_length=60,
                        min_length=20, 
                        truncation=True,
                        do_sample=False
                    )[0]['summary_text']
                except:
                    return "Error generating summary for this batch."
        
        # Adaptively determine chunk size based on review length
        # If reviews are very short, use larger chunks
        avg_review_len = sum(len(r) for r in reviews[:100]) / min(100, len(reviews))
        if avg_review_len < 100:  # Very short reviews
            chunk_size = min(base_chunk_size * 2, 500)
        elif avg_review_len > 500:  # Very long reviews
            chunk_size = max(base_chunk_size // 2, 50)
        else:
            chunk_size = base_chunk_size
            
        print(f"Worker {worker_id}: Using chunk size {chunk_size} for avg review length {avg_review_len:.1f}")
        
        # Prepare all chunks for processing
        all_chunks = []
        for i in range(0, len(reviews), chunk_size):
            batch = reviews[i:i+chunk_size]
            text = "\n\n".join(batch)
            all_chunks.append(text)
        
        # Process chunks with batched processing
        try:
            intermediate_summaries = process_chunks_batched(all_chunks)
            
            # Summarize the intermediate summaries
            joined = " ".join(intermediate_summaries)
            final_summary = summarizer(
                joined,
                max_length=60,
                min_length=20,
                truncation=True,
                do_sample=False
            )[0]['summary_text']
            
            return final_summary
        except Exception as e:
            print(f"Error in hierarchical summarization: {e}")
            # Try to salvage what we can
            if intermediate_summaries:
                try:
                    return f"Partial summary: {' '.join(intermediate_summaries[:3])}"
                except:
                    pass
            return "Error generating hierarchical summary."
    
    # Process the partition with checkpointing
    results = []
    processed_count = 0
    
    # Create a progress bar for this worker
    with tqdm(total=len(partition_df), desc=f"Worker {worker_id}", position=worker_id) as pbar:
        for idx, row in partition_df.iterrows():
            try:
                # Skip processing if we already have too many errors in a row
                if processed_count > 0 and len(results) == 0:
                    # If first N items all failed, skip this worker
                    if processed_count >= 5:
                        print(f"Worker {worker_id} failing consistently, aborting")
                        break
                
                # Process the review with the adaptive chunk size
                summary = hierarchical_summary(row['Reviews'], base_chunk_size=resources['chunk_size'])
                results.append((idx, summary))
                processed_count += 1
                
                # Clean up every few iterations
                if processed_count % 5 == 0:
                    if torch.cuda.is_available():
                        torch.cuda.empty_cache()
                    gc.collect()
                    
                # Checkpoint every 10 items
                if processed_count % 10 == 0:
                    print(f"Worker {worker_id}: Processed {processed_count}/{len(partition_df)} items")
                
            except Exception as e:
                print(f"Error processing row {idx} in worker {worker_id}: {e}")
                # Still record the error so we know this row was attempted
                results.append((idx, f"Error: Failed to generate summary."))
            
            pbar.update(1)
    
    # Final cleanup
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    gc.collect()
    
    print(f"Worker {worker_id} completed: {len(results)}/{len(partition_df)} successful")
    return results

# 6. Schedule the tasks with the delayed partitions
print(f"Scheduling {n_workers} partitions for processing...")
delayed_results = []
for i in range(n_workers):
    delayed_result = process_partition(partitions[i], i)
    delayed_results.append(delayed_result)
    print(f"Scheduled partition {i+1}/{n_workers}")

# 7. Progress tracking and checkpointing
# Create main progress bar for overall progress
print("\nStarting distributed computation with progress tracking:")
main_progress = tqdm(total=len(df_to_process), desc="Overall Progress")

# Start timing
start_time = time.time()

# Create a global progress updater with checkpointing
def update_main_progress(futures):
    """Update progress bar and save checkpoints"""
    checkpoint_file = 'checkpoints/summarization_progress.json'
    summaries_so_far = existing_summaries.copy()
    
    while not stop_flag:
        # Count completed futures
        completed_count = sum(f.status == 'finished' for f in futures)
        completed_percentage = completed_count / len(futures)
        
        # Update progress bar
        main_progress.n = int(len(df_to_process) * completed_percentage)
        main_progress.refresh()
        
        # Check for newly completed results and update checkpoint
        for future in [f for f in futures if f.status == 'finished']:
            try:
                result = future.result()
                for idx, summary in result:
                    summaries_so_far[str(idx)] = summary
            except:
                pass  # Skip failed futures
        
        # Save checkpoint every 30 seconds
        with open(checkpoint_file, 'w') as f:
            json.dump(summaries_so_far, f)
        
        time.sleep(5)

# Submit the tasks to the cluster
futures = client.compute(delayed_results)

# Start a loop to update the main progress bar
stop_flag = False

# Start the progress monitor in a separate thread
monitor_thread = threading.Thread(target=update_main_progress, args=(futures,))
monitor_thread.daemon = True  # Allow program to exit if thread is still running
monitor_thread.start()

# 8. Wait for computation to complete with robust error handling
try:
    print("Computing all partitions...")
    results = client.gather(futures)
except Exception as e:
    # Fallback to direct computation if future gathering fails
    print(f"Error with futures: {e}")
    print("Falling back to direct computation...")
    results = dask.compute(*delayed_results)

# Stop the progress monitor
stop_flag = True
monitor_thread.join(timeout=5)  # Wait for thread to terminate, but with timeout

# Update progress bar to completion
main_progress.n = len(df_to_process)
main_progress.refresh()
main_progress.close()

# 9. Process results with checkpoint recovery
all_results = []

# Gather results from all workers
for worker_results in results:
    if worker_results:  # Check if worker returned any results
        all_results.extend(worker_results)

# Load checkpoint file for any results we already had
checkpoint_file = 'checkpoints/summarization_progress.json'
if os.path.exists(checkpoint_file):
    with open(checkpoint_file, 'r') as f:
        checkpoint_data = json.load(f)
        
    # Add checkpoint data for any missing indices
    result_indices = [idx for idx, _ in all_results]
    for idx_str, summary in checkpoint_data.items():
        idx = int(idx_str)
        if idx not in result_indices:
            all_results.append((idx, summary))

# Sort by index to maintain order
all_results.sort(key=lambda x: x[0])

# Create a dictionary mapping of indices to summaries
result_dict = {idx: summary for idx, summary in all_results}

# Apply to final report
final_report['QuickSummary'] = final_report.index.map(
    lambda idx: result_dict.get(idx, "Summary not generated")
)

# Report final timing
elapsed_time = time.time() - start_time
print(f"\nCompleted in {elapsed_time:.2f} seconds")
print(f"Successfully summarized {len(result_dict)}/{len(final_report)} items")

# Display results
print("\nSample results:")
display(final_report[['steam_appid', 'Theme', 'QuickSummary']].head())

# 10. Save the results
final_report.to_csv('output_csvs/dynamic_summarized_report.csv')
print("Results saved to output_csvs/dynamic_summarized_report.csv")

# Shut down the client and cluster
client.close()
cluster.close()