In [3]:
import os
import time
import re
import random
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from bs4 import BeautifulSoup
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)

from langdetect import detect

from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM, pipeline
import torch

# Set a default device (GPU 0) for pipelines
device_id = 0


In [4]:
def perform_random_delay(delay=3, random_offset=0.5):
    """Pause execution for a short randomized delay."""
    time.sleep(delay + random.uniform(0, random_offset))

In [5]:
def extract_ign_review(soup):
    """
    For IGN pages: extract review text from a container with data-cy="article-content" or id="article-body".
    """
    container = soup.find("div", {"data-cy": "article-content"})
    if container is None:
        container = soup.find("div", {"id": "article-body"})
    if container:
        elements = container.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
        texts = [elem.get_text(separator=" ", strip=True) for elem in elements if elem.get_text(strip=True)]
        return "\n".join(texts)
    return None

def extract_pcgamer_review(soup):
    """
    For PCGamer pages: extract review text from a container with id or class "article-body".
    """
    container = soup.find("div", {"id": "article-body"})
    if not container:
        container = soup.find("div", class_="article-body")
    if container:
        elements = container.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
        texts = [elem.get_text(separator=" ", strip=True) for elem in elements if elem.get_text(strip=True)]
        return "\n".join(texts)
    return None

def extract_eurogamer_review(soup):
    """
    For Eurogamer pages: extract review text from a container with class "article_body" and data-component="article-content",
    or fallback to the first <section>.
    """
    container = soup.find("div", {"data-component": "article-content", "class": "article_body"})
    if container is None:
        container = soup.find("section")
    if container:
        elements = container.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
        texts = [elem.get_text(separator=" ", strip=True) for elem in elements if elem.get_text(strip=True)]
        return "\n".join(texts)
    return None

def extract_vg247_review(soup):
    """
    For VG247 pages: extract review text from a container with class "article_body" and data-component="article-content",
    or fallback to a div with class "article_body".
    """
    container = soup.find("div", {"data-component": "article-content", "class": "article_body"})
    if container is None:
        container = soup.find("div", class_="article_body")
    if container:
        elements = container.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
        texts = [elem.get_text(separator=" ", strip=True) for elem in elements if elem.get_text(strip=True)]
        return "\n".join(texts)
    return None


In [6]:
def scrape_critic_reviews(game_name, headers=None, websites=None):
    """
    Scrapes critic reviews for a given game from four websites.
    Returns a DataFrame with columns: 'website', 'review_text', 'url'.
    """
    if headers is None:
        headers = {
            "User-Agent": "Mozilla/5.0",
            "Accept-Language": "en-US,en;q=0.9"
        }
    
    # Define websites with URL pattern and corresponding extractor
    websites = {
        "IGN": {
            "url": "https://www.ign.com/articles/{slug}-review",
            "extractor": extract_ign_review
        },
        "PCGamer": {
            "url": "https://www.pcgamer.com/{slug}-review/",
            "extractor": extract_pcgamer_review
        },
        "Eurogamer": {
            "url": "https://www.eurogamer.net/articles/{slug}-review/",
            "extractor": extract_eurogamer_review
        },
        "VG247": {
            "url": "https://www.vg247.com/{slug}-review/",
            "extractor": extract_vg247_review
        }
    }
    
    reviews_list = []
    # Create a simple slug: lowercase, spaces to hyphens, remove apostrophes
    slug = game_name.lower().replace(" ", "-").replace("'", "")
    
    for site_name, site_info in websites.items():
        url = site_info['url'].format(slug=slug)
        logging.info(f"Scraping {site_name} from URL: {url}")
        try:
            response = requests.get(url, headers=headers, timeout=10)
            if response.status_code != 200:
                logging.warning(f"Failed to retrieve review from {site_name}. HTTP Status: {response.status_code}")
                continue
            
            soup = BeautifulSoup(response.text, 'html.parser')
            review_text = site_info["extractor"](soup)
            if not review_text:
                logging.warning(f"No review text found on {site_name}.")
                continue
            
            reviews_list.append({
                "website": site_name,
                "review_text": review_text,
                "url": url
            })
            
            perform_random_delay(1, 0.3)
            
        except Exception as e:
            logging.error(f"Error while scraping {site_name}: {e}")
    
    return pd.DataFrame(reviews_list)

In [7]:
def clean_review_data(df, dropna=True, drop_duplicated=True, remove_spoiler=True, all_languages=False, selected_languages=['en']):
    """
    Cleans the DataFrame of reviews:
      - drops NA values and duplicates,
      - removes standardized spoiler messages,
      - and optionally filters by language.
    """
    if dropna:
        df = df.dropna()
    if drop_duplicated:
        df = df.drop_duplicates()
    if remove_spoiler:
        df = df[df['review_text'] != "[SPOILER ALERT: This review contains spoilers.]"]
    
    if not all_languages:
        languages = []
        for text in df['review_text']:
            try:
                languages.append(detect(text))
            except Exception:
                languages.append('unknown')
        df['language'] = languages
        df = df[df['language'].isin(selected_languages)]
    
    if len(selected_languages) <= 1 and 'language' in df.columns:
        df.drop(columns=['language'], inplace=True)
    
    return df

In [8]:
def find_aspects(text, aspects, classifier=None):
    """
    Evaluates sentiment for each aspect keyword in the review text.
    A positive sentiment adds the classifier's confidence score; negative subtracts it.
    """
    if classifier is None:
        # Use a (half-precision) ABSA model; loading in fp16 to reduce memory usage
        model_name = "yangheng/deberta-v3-base-absa-v1.1"
        tokenizer_absa = AutoTokenizer.from_pretrained(model_name, use_fast=False)
        model_absa = AutoModelForSequenceClassification.from_pretrained(model_name, torch_dtype=torch.float16).to("cuda")
        classifier = pipeline("text-classification", model=model_absa, tokenizer=tokenizer_absa, device=device_id)
    
    aspect_scores = {}
    for aspect in aspects:
        try:
            result = classifier(text, text_pair=aspect)[0]
            score = result['score'] if result['label'] == 'Positive' else -result['score']
            aspect_scores[aspect] = score
        except Exception as e:
            logging.warning(f"ABSA error for aspect '{aspect}': {e}")
            aspect_scores[aspect] = 0.0
    return aspect_scores

In [9]:
def summarize_text(text, summarizer=None):
    """
    Summarizes the given text using a pretrained summarization model.
    If the text is too long for the model (i.e. more tokens than the maximum input length),
    the text is split into chunks, each chunk is summarized, and the summaries are combined.
    """
    if summarizer is None:
        summarizer = pipeline("summarization", model="t5-small", tokenizer="t5-small", device=device_id)
    try:
        tokenizer = summarizer.tokenizer
        max_input_length = tokenizer.model_max_length  # usually 512 tokens for T5-small
        inputs = tokenizer(text, return_tensors="pt", truncation=False)
        input_length = inputs.input_ids.shape[1]
        
        if input_length <= max_input_length:
            summary = summarizer(text, max_length=150, min_length=40, do_sample=False)
            return summary[0]['summary_text']
        else:
            # Split text into chunks by sentences
            sentences = re.split(r'(?<=[.?!])\s+', text)
            chunks = []
            current_chunk = ""
            for sentence in sentences:
                test_chunk = current_chunk + " " + sentence if current_chunk else sentence
                test_tokens = tokenizer.encode(test_chunk, add_special_tokens=True)
                if len(test_tokens) <= max_input_length:
                    current_chunk = test_chunk
                else:
                    if current_chunk:
                        chunks.append(current_chunk)
                    current_chunk = sentence
            if current_chunk:
                chunks.append(current_chunk)
            
            # Summarize each chunk and combine the summaries
            summaries = []
            for chunk in chunks:
                summ = summarizer(chunk, max_length=150, min_length=40, do_sample=False)
                summaries.append(summ[0]['summary_text'])
            combined_summary = " ".join(summaries)
            
            # Re-summarize if necessary
            inputs_combined = tokenizer(combined_summary, return_tensors="pt", truncation=False)
            if inputs_combined.input_ids.shape[1] > max_input_length:
                final_summary = summarizer(combined_summary, max_length=150, min_length=40, do_sample=False)
                return final_summary[0]['summary_text']
            else:
                return combined_summary
    except Exception as e:
        logging.error(f"Summarization error: {e}")
        return ""

def comment_analysis_with_summary(game_name, aspects, headers=None):
    """
    Performs the following steps:
      1. Loads the ABSA model and a T5-small summarizer.
      2. Scrapes critic reviews for the game.
      3. Cleans the review data.
      4. For each review, computes aspect sentiment scores.
      5. Combines all review texts into one aggregated text and summarizes it.
      6. Returns overall aspect scores (averaged over reviews), the combined summary, and the DataFrame.
    """
    logging.info("Loading ABSA model for aspect analysis...")
    model_name_absa = "yangheng/deberta-v3-base-absa-v1.1"
    tokenizer_absa = AutoTokenizer.from_pretrained(model_name_absa, use_fast=False)
    model_absa = AutoModelForSequenceClassification.from_pretrained(model_name_absa, torch_dtype=torch.float16).to("cuda")
    classifier = pipeline("text-classification", model=model_absa, tokenizer=tokenizer_absa, device=device_id)
    logging.info("ABSA model loaded on GPU.")

    logging.info("Initializing summarization pipeline (T5-small)...")
    summarizer = pipeline("summarization", model="t5-small", tokenizer="t5-small", device=device_id)
    logging.info("Summarization model loaded on GPU.")

    logging.info("Scraping critic reviews...")
    df_reviews = scrape_critic_reviews(game_name, headers=headers)
    if df_reviews.empty:
        logging.error("No reviews scraped. Exiting analysis.")
        return {}, "", df_reviews

    logging.info("Cleaning review data...")
    df_reviews = clean_review_data(df_reviews)

    logging.info("Performing aspect-based sentiment analysis...")
    # Compute aspect scores for each review and average them for overall scores
    scores_df = df_reviews['review_text'].apply(lambda x: pd.Series(find_aspects(x, aspects, classifier=classifier)))
    df_reviews = pd.concat([df_reviews, scores_df], axis=1)
    logging.info("Aspect analysis complete!")

    # Combine all review texts into one aggregated text
    combined_text = "\n\n".join(df_reviews['review_text'].tolist())
    logging.info("Generating overall summary from combined reviews...")
    overall_summary = summarize_text(combined_text, summarizer=summarizer)
    logging.info("Overall summarization complete!")

    overall_aspect_scores = scores_df.mean().to_dict()  # Averaging scores for better interpretability
    
    return overall_aspect_scores, overall_summary, df_reviews

In [None]:
if __name__ == '__main__':
    # Example usage: analyze "baldurs gate 3" reviews for several aspects
    game_name = "red dead redemption 2"
    aspects = ['graphics', 'gameplay', 'story', 'performance']
    
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                      "AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/90.0.4430.85 Safari/537.36",
        "Accept-Language": "en-US,en;q=0.9"
    }
    
    overall_aspect_scores, overall_summary, df_reviews = comment_analysis_with_summary(game_name, aspects, headers=headers)
    
    # Print the overall summary based on all scraped reviews
    print("Overall Summary of Scraped Reviews:")
    print(overall_summary)
    
    # Print overall averaged aspect scores
    print("\nOverall Aspect Scores (averaged over reviews):")
    for aspect, score in overall_aspect_scores.items():
        print(f"{aspect}: {score:.3f}")
    
    # Plot overall aspect scores using an improved horizontal bar chart with a diverging colormap
    import matplotlib.cm as cm
    labels = list(overall_aspect_scores.keys())
    scores = [overall_aspect_scores[aspect] for aspect in labels]
    
    # Sorting for a cleaner horizontal bar chart:
    sorted_data = sorted(zip(labels, scores), key=lambda x: x[1])
    sorted_labels, sorted_scores = zip(*sorted_data)
    
    # Define colormap: if scores span negative to positive, use a diverging colormap.
    min_score = min(sorted_scores)
    max_score = max(sorted_scores)
    if max_score - min_score == 0:
        colors = ['skyblue'] * len(sorted_scores)
    else:
        colors = [cm.coolwarm((s - min_score) / (max_score - min_score)) for s in sorted_scores]
    
    plt.figure(figsize=(8, 6))
    bars = plt.barh(sorted_labels, sorted_scores, color=colors)
    plt.xlabel('Average Sentiment Score')
    plt.title('Overall Aspect Sentiment Scores')
    plt.grid(axis='x', linestyle='--', alpha=0.7)
    
    # Annotate bars with their values
    for bar in bars:
        width = bar.get_width()
        plt.text(width, bar.get_y() + bar.get_height()/2,
                 f'{width:.2f}', ha='left', va='center', fontsize=10)
    
    plt.tight_layout()
    plt.show()