<a href="https://colab.research.google.com/github/abhy-kumar/NLPulse/blob/main/NLPulse.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install feedparser TextBlob
!pip install nltk transformers torch
#Cell 1

Collecting feedparser
  Downloading feedparser-6.0.11-py3-none-any.whl.metadata (2.4 kB)
Collecting sgmllib3k (from feedparser)
  Downloading sgmllib3k-1.0.0.tar.gz (5.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading feedparser-6.0.11-py3-none-any.whl (81 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.3/81.3 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: sgmllib3k
  Building wheel for sgmllib3k (setup.py) ... [?25l[?25hdone
  Created wheel for sgmllib3k: filename=sgmllib3k-1.0.0-py3-none-any.whl size=6047 sha256=38d6d78cafceadb3d5017b0643474d9bc5eca48574fba455b6fd1ba2e4985480
  Stored in directory: /root/.cache/pip/wheels/f0/69/93/a47e9d621be168e9e33c7ce60524393c0b92ae83cf6c6e89c5
Successfully built sgmllib3k
Installing collected packages: sgmllib3k, feedparser
Successfully installed feedparser-6.0.11 sgmllib3k-1.0.0


In [5]:
# Cell 2

import nltk
import torch
import requests
import feedparser
import sqlite3
import pandas as pd
import plotly.graph_objects as go
import numpy as np
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
from nltk.sentiment import SentimentIntensityAnalyzer
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    pipeline
)
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
import logging
from typing import List, Tuple, Set
from wordcloud import WordCloud
import seaborn as sns
from collections import Counter
import matplotlib.pyplot as plt
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import warnings
from difflib import SequenceMatcher

warnings.filterwarnings('ignore')

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Constants
DATABASE_NAME = 'news_sentiment.db'
NEWS_FEED_URL = "https://www.thehindu.com/feeder/default.rss"
DATE_FORMAT = "%Y-%m-%d"
DATETIME_FORMAT = "%a, %d %b %Y %H:%M:%S %z"

class DatabaseManager:
    """
    Manages the SQLite database operations for storing and retrieving sentiment scores.
    It handles the creation of the necessary tables, insertion of new data,
    and various queries to retrieve statistics and specific datasets.
    """
    def __init__(self, db_name: str):
        self.db_name = db_name
        self._setup_database()
        self.connection = None
        self._setup_connection()

    def _setup_database(self):
        """
        Initializes the database by creating the sentiment_scores table if it does not exist.
        Also creates indexes on date, title, and score columns to optimize query performance.
        """
        with sqlite3.connect(self.db_name) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS sentiment_scores (
                    date TEXT,
                    time TEXT,
                    title TEXT,
                    summary TEXT,
                    score REAL
                )
            ''')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON sentiment_scores(date)')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_title ON sentiment_scores(title)')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_score ON sentiment_scores(score)')

    def _setup_connection(self):
        """
        Establishes a persistent connection to the SQLite database with optimized parameters
        for performance. Configures the database to use Write-Ahead Logging and sets cache sizes.
        """
        if not self.connection:
            self.connection = sqlite3.connect(self.db_name, check_same_thread=False)
            self.connection.execute('PRAGMA journal_mode=WAL')  # Write-Ahead Logging
            self.connection.execute('PRAGMA synchronous=NORMAL')
            self.connection.execute('PRAGMA cache_size=-2000')  # 2MB cache
            self.connection.execute('PRAGMA temp_store=MEMORY')

    def store_score(self, date: str, time: str, title: str, summary: str, score: float):
        """
        Inserts a new sentiment score entry into the sentiment_scores table.

        Parameters:
            date (str): The date of the news article.
            time (str): The time of publication.
            title (str): The headline of the news article.
            summary (str): The summary of the news article.
            score (float): The sentiment score associated with the article.
        """
        try:
            self.connection.execute(
                "INSERT INTO sentiment_scores VALUES (?, ?, ?, ?, ?)",
                (date, time, title, summary, score)
            )
            self.connection.commit()
        except Exception as e:
            logging.error(f"Error storing score: {e}")

    @lru_cache(maxsize=128)
    def get_daily_average(self, date: str) -> float:
        """
        Retrieves the average sentiment score for a specific date.

        Parameters:
            date (str): The date for which to calculate the average sentiment.

        Returns:
            float: The average sentiment score, or 0.0 if no entries are found.
        """
        try:
            result = self.connection.execute(
                "SELECT AVG(score) FROM sentiment_scores WHERE date = ?",
                (date,)
            ).fetchone()
            return result[0] if result[0] is not None else 0.0
        except Exception as e:
            logging.error(f"Error getting daily average: {e}")
            return 0.0

    def get_headlines_with_scores(self, limit: int = 10, min_summary_length: int = 20) -> pd.DataFrame:
        """
        Retrieves recent headlines along with their sentiment scores,
        excluding entries with summaries shorter than the specified length.

        Parameters:
            limit (int): The maximum number of headlines to retrieve.
            min_summary_length (int): The minimum length of the summary to include an entry.

        Returns:
            pd.DataFrame: A DataFrame containing the selected headlines and their scores.
        """
        try:
            query = """
                SELECT date, time, title, score
                FROM sentiment_scores
                WHERE LENGTH(summary) >= ?
                ORDER BY date DESC, time DESC
                LIMIT ?
            """
            return pd.read_sql_query(query, self.connection, params=(min_summary_length, limit))
        except Exception as e:
            logging.error(f"Error retrieving headlines: {e}")
            return pd.DataFrame()

    def get_extreme_sentiment_headlines(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Retrieves the top five most positive and top five most negative headlines based on sentiment scores.

        Returns:
            Tuple[pd.DataFrame, pd.DataFrame]: Two DataFrames containing the most positive and most negative headlines.
        """
        try:
            most_positive = pd.read_sql_query("""
                SELECT date, time, title, score
                FROM sentiment_scores
                ORDER BY score DESC
                LIMIT 5
            """, self.connection)

            most_negative = pd.read_sql_query("""
                SELECT date, time, title, score
                FROM sentiment_scores
                ORDER BY score ASC
                LIMIT 5
            """, self.connection)

            return most_positive, most_negative
        except Exception as e:
            logging.error(f"Error retrieving extreme headlines: {e}")
            return pd.DataFrame(), pd.DataFrame()

    def get_headline_stats(self) -> dict:
        """
        Gathers statistical information about the headlines stored in the database,
        including average sentiment, total number of headlines, and the date range covered.

        Returns:
            dict: A dictionary containing the statistical metrics.
        """
        try:
            stats = {}

            avg_query = "SELECT AVG(score) FROM sentiment_scores"
            stats['average_sentiment'] = self.connection.execute(avg_query).fetchone()[0]

            count_query = "SELECT COUNT(*) FROM sentiment_scores"
            stats['total_headlines'] = self.connection.execute(count_query).fetchone()[0]

            range_query = """
                SELECT
                    MIN(date) as earliest_date,
                    MAX(date) as latest_date
                FROM sentiment_scores
            """
            earliest, latest = self.connection.execute(range_query).fetchone()
            stats['date_range'] = f"{earliest} to {latest}"

            return stats
        except Exception as e:
            logging.error(f"Error getting headline stats: {e}")
            return {}

    def close(self):
        """
        Closes the database connection if it is open.
        """
        if self.connection:
            self.connection.close()

class SentimentAnalyzer:
    """
    Performs sentiment analysis on text data using multiple NLP models.
    Combines results from VADER, FinBERT, and RoBERTa models to produce a comprehensive sentiment score.
    """
    def __init__(self):
        self._initialize_models()
        self._setup_device()

    def _initialize_models(self):
        """
        Initializes the necessary NLP models and tokenizers.
        Downloads required NLTK data if not already present.
        """
        for resource in ['vader_lexicon', 'punkt', 'stopwords']:
            try:
                nltk.data.find(f'tokenizers/{resource}')
            except LookupError:
                nltk.download(resource, quiet=True)

        self.sia = SentimentIntensityAnalyzer()

        self.finbert_tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
        self.finbert_model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")
        self.roberta_tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
        self.roberta_model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
        self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")

    def _setup_device(self):
        """
        Configures the computational device (GPU if available, else CPU)
        and moves the NLP models to the appropriate device.
        """
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.finbert_model.to(self.device)
        self.roberta_model.to(self.device)

    @lru_cache(maxsize=1024)
    def analyze_sentiment(self, text: str) -> float:
        """
        Analyzes the sentiment of the provided text using VADER, FinBERT, and RoBERTa models.
        Combines the scores from each model into a weighted average to produce a final sentiment score.

        Parameters:
            text (str): The text to analyze.

        Returns:
            float: A sentiment score between 0 and 10.
        """
        if not text or len(text.strip()) < 10:
            return 5.0

        try:
            with ThreadPoolExecutor(max_workers=3) as executor:
                vader_future = executor.submit(self._get_vader_score, text)
                finbert_future = executor.submit(self._get_finbert_score, text)
                roberta_future = executor.submit(self._get_roberta_score, text)

                vader_score, vader_confidence = vader_future.result()
                finbert_score, finbert_confidence = finbert_future.result()
                roberta_score, roberta_confidence = roberta_future.result()

            weights = np.array([vader_confidence, finbert_confidence, roberta_confidence])
            weights = weights / weights.sum()
            scores = np.array([vader_score, finbert_score, roberta_score])

            combined_score = np.dot(weights, scores)
            return max(0, min(combined_score, 10))

        except Exception as e:
            logging.error(f"Error in sentiment analysis: {e}")
            return 5.0

    def _get_vader_score(self, text: str) -> Tuple[float, float]:
        """
        Computes the sentiment score using the VADER model.

        Parameters:
            text (str): The text to analyze.

        Returns:
            Tuple[float, float]: The sentiment score scaled between 0 and 10, and the confidence score.
        """
        scores = self.sia.polarity_scores(text)
        return (scores['compound'] + 1) * 5, abs(scores['compound'])

    def _get_finbert_score(self, text: str) -> Tuple[float, float]:
        """
        Computes the sentiment score using the FinBERT model.

        Parameters:
            text (str): The text to analyze.

        Returns:
            Tuple[float, float]: The sentiment score scaled between 0 and 10, and the confidence score.
        """
        with torch.no_grad():
            inputs = self.finbert_tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(self.device)
            outputs = self.finbert_model(**inputs)
            probs = torch.nn.functional.softmax(outputs.logits, dim=1)
            sentiment = torch.argmax(probs).item()
            return sentiment * 5, torch.max(probs).item()

    def _get_roberta_score(self, text: str) -> Tuple[float, float]:
        """
        Computes the sentiment score using the RoBERTa model.

        Parameters:
            text (str): The text to analyze.

        Returns:
            Tuple[float, float]: The sentiment score scaled between 0 and 10, and the confidence score.
        """
        with torch.no_grad():
            inputs = self.roberta_tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(self.device)
            outputs = self.roberta_model(**inputs)
            probs = torch.nn.functional.softmax(outputs.logits, dim=1)
            sentiment = torch.argmax(probs).item()
            return sentiment * 5, torch.max(probs).item()

import sqlite3
import pandas as pd
import plotly.graph_objects as go
import logging

class DataVisualizer:
    """
    Generates various visualizations and dashboards based on the sentiment scores stored in the database.
    Creates multiple subplots to display different aspects of the data, such as daily counts, sentiment trends,
    distribution of summaries, and more.
    """
    @staticmethod
    def create_visualizations(db_name: str):
        """
        Creates an enhanced set of visualizations that provide insights into the sentiment analysis data.
        Includes timelines, distributions, heatmaps, and scatter plots to illustrate different metrics.

        Parameters:
            db_name (str): The name of the SQLite database to retrieve data from.
        """
        with sqlite3.connect(db_name) as conn:
            df = pd.read_sql_query("SELECT * FROM sentiment_scores", conn)

        if df.empty:
            logging.warning("No data available for visualization")
            return

        df['date'] = pd.to_datetime(df['date'])
        df['hour'] = pd.to_datetime(df['time']).dt.hour

        fig = make_subplots(
            rows=3, cols=3,
            subplot_titles=(
                'Daily Entry Counts',
                'Hourly Distribution',
                'Sentiment Timeline',
                'Summary Length Distribution',
                'Sentiment Distribution',
                'Weekly Patterns',
                'Sentiment Moving Average',
                'Headline Length vs Sentiment',
                'Time of Day Sentiment'
            ),
            specs=[
                [{'type': 'scatter'}, {'type': 'bar'}, {'type': 'scatter'}],
                [{'type': 'histogram'}, {'type': 'histogram'}, {'type': 'heatmap'}],
                [{'type': 'scatter'}, {'type': 'scatter'}, {'type': 'scatter'}]
            ],
            horizontal_spacing=0.12,
            vertical_spacing=0.15
        )

        daily_counts = df.groupby('date').size()
        fig.add_trace(
            go.Scatter(
                x=daily_counts.index,
                y=daily_counts.values,
                mode='lines+markers',
                name='Daily Entries',
                line=dict(width=2, color='royalblue'),
                marker=dict(size=6)
            ),
            row=1, col=1
        )

        hourly_counts = df['hour'].value_counts().sort_index()
        fig.add_trace(
            go.Bar(
                x=hourly_counts.index,
                y=hourly_counts.values,
                name='Hourly Distribution',
                marker_color='lightblue'
            ),
            row=1, col=2
        )

        daily_sentiment = df.groupby('date')['score'].mean()
        fig.add_trace(
            go.Scatter(
                x=daily_sentiment.index,
                y=daily_sentiment.values,
                mode='lines',
                name='Daily Sentiment',
                line=dict(color='green', width=2)
            ),
            row=1, col=3
        )

        summary_lengths = DataVisualizer.get_summary_lengths(db_name)
        fig.add_trace(
            go.Histogram(
                x=summary_lengths,
                name='Summary Lengths',
                nbinsx=30,
                marker_color='lightgreen'
            ),
            row=2, col=1
        )

        fig.add_trace(
            go.Histogram(
                x=df['score'],
                name='Sentiment Distribution',
                nbinsx=20,
                histnorm='probability',
                marker_color='coral'
            ),
            row=2, col=2
        )

        df['weekday'] = pd.to_datetime(df['date']).dt.day_name()
        weekly_sentiment = df.pivot_table(
            values='score',
            index='weekday',
            columns='hour',
            aggfunc='mean'
        )

        weekday_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
        weekly_sentiment = weekly_sentiment.reindex(weekday_order)

        fig.add_trace(
            go.Heatmap(
                z=weekly_sentiment.values,
                x=weekly_sentiment.columns,
                y=weekly_sentiment.index,
                colorscale='RdYlBu',
                name='Weekly Patterns',
                colorbar=dict(
                    title="Sentiment",
                    thickness=10,
                    len=0.3,
                    yanchor="middle",
                    y=0.5,
                    xanchor="left",
                    x=1.02
                )
            ),
            row=2, col=3
        )

        daily_sentiment = df.groupby('date')['score'].mean().reset_index()
        daily_sentiment['MA7'] = daily_sentiment['score'].rolling(window=7).mean()
        fig.add_trace(
            go.Scatter(
                x=daily_sentiment['date'],
                y=daily_sentiment['MA7'],
                mode='lines',
                name='7-Day Moving Average',
                line=dict(color='purple', width=2)
            ),
            row=3, col=1
        )

        df['title_length'] = df['title'].str.len()
        fig.add_trace(
            go.Scatter(
                x=df['title_length'],
                y=df['score'],
                mode='markers',
                name='Length vs Sentiment',
                marker=dict(
                    size=6,
                    color=df['score'],
                    colorscale='Viridis',
                    showscale=False
                )
            ),
            row=3, col=2
        )

        hourly_sentiment = df.groupby('hour')['score'].mean()
        fig.add_trace(
            go.Scatter(
                x=hourly_sentiment.index,
                y=hourly_sentiment.values,
                mode='lines+markers',
                name='Hourly Sentiment',
                line=dict(shape='spline', color='orangered', width=2)
            ),
            row=3, col=3
        )

        fig.update_layout(
            height=1200,
            width=1600,
            showlegend=True,
            legend=dict(
                orientation="h",
                yanchor="bottom",
                y=-0.2,
                xanchor="center",
                x=0.5
            ),
            title_text="Enhanced News Analysis Dashboard",
            template="plotly_white"
        )

        axis_labels = {
            (1,1): ('Date', 'Number of Articles'),
            (1,2): ('Hour of Day', 'Number of Articles'),
            (1,3): ('Date', 'Average Sentiment Score'),
            (2,1): ('Summary Length (characters)', 'Frequency'),
            (2,2): ('Sentiment Score', 'Probability'),
            (2,3): ('Hour of Day', 'Day of Week'),
            (3,1): ('Date', '7-Day Moving Average'),
            (3,2): ('Headline Length (characters)', 'Sentiment Score'),
            (3,3): ('Hour of Day', 'Average Sentiment Score')
        }

        for (row, col), (xlabel, ylabel) in axis_labels.items():
            fig.update_xaxes(title_text=xlabel, row=row, col=col)
            fig.update_yaxes(title_text=ylabel, row=row, col=col)

        fig.show()

    @staticmethod
    def get_summary_lengths(db_name: str) -> list:
        """
        Retrieves the lengths of all summaries stored in the sentiment_scores table.

        Parameters:
            db_name (str): The name of the SQLite database to query.

        Returns:
            list: A list of summary lengths.
        """
        try:
            with sqlite3.connect(db_name) as conn:
                cursor = conn.cursor()
                cursor.execute("SELECT LENGTH(summary) FROM sentiment_scores")
                return [row[0] for row in cursor.fetchall()]
        except Exception as e:
            logging.error(f"Error getting summary lengths: {e}")
            return []

class NewsFetcher:
    """
    Fetches news articles from an RSS feed, processes them to analyze sentiment,
    and stores the results in the database. Ensures that duplicate or similar
    articles are not processed multiple times.
    """
    def __init__(self, db_manager, sentiment_analyzer):
        self.db_manager = db_manager
        self.sentiment_analyzer = sentiment_analyzer
        self._initialize_processed_titles()

    def _initialize_processed_titles(self):
        """
        Loads existing article titles from the database to avoid processing duplicates.
        """
        try:
            with sqlite3.connect(self.db_manager.db_name) as conn:
                cursor = conn.execute("SELECT title FROM sentiment_scores")
                self.processed_titles = {row[0] for row in cursor.fetchall()}
            logging.info(f"Loaded {len(self.processed_titles)} existing titles from database")
        except Exception as e:
            logging.error(f"Error loading existing titles: {e}")
            self.processed_titles = set()

    def _is_similar_to_existing(self, title: str, threshold: float = 0.85) -> bool:
        """
        Determines if a given title is similar to any already processed titles using SequenceMatcher.

        Parameters:
            title (str): The title to compare.
            threshold (float): The similarity threshold above which a title is considered similar.

        Returns:
            bool: True if the title is similar to an existing one, False otherwise.
        """
        for existing_title in self.processed_titles:
            similarity = SequenceMatcher(None, title.lower(), existing_title.lower()).ratio()
            if similarity > threshold:
                logging.info(f"Found similar title (similarity: {similarity:.2f})")
                return True
        return False

    def _is_valid_entry(self, entry) -> bool:
        """
        Validates a news entry based on the presence and length of its summary and title,
        and the existence of a publication date.

        Parameters:
            entry: The news entry to validate.

        Returns:
            bool: True if the entry is valid, False otherwise.
        """
        if not hasattr(entry, 'summary') or len(entry.summary.strip()) < 17:
            logging.debug(f"Skipping entry with insufficient summary: {entry.title[:50]}...")
            return False

        if not hasattr(entry, 'title') or len(entry.title.strip()) < 5:
            logging.debug("Skipping entry with insufficient title")
            return False

        if not hasattr(entry, 'published'):
            logging.debug("Skipping entry without published date")
            return False

        return True

    def fetch_and_process_news(self):
        """
        Fetches news articles from the RSS feed, analyzes their sentiment,
        and stores valid and unique entries into the database.
        """
        try:
            logging.info("Fetching news from RSS feed...")
            feed = feedparser.parse(NEWS_FEED_URL)
            new_entries_count = 0
            skipped_entries_count = 0

            for entry in feed.entries:
                if not self._is_valid_entry(entry):
                    skipped_entries_count += 1
                    continue

                if entry.title in self.processed_titles or self._is_similar_to_existing(entry.title):
                    logging.debug(f"Skipping duplicate/similar title: {entry.title[:50]}...")
                    skipped_entries_count += 1
                    continue

                try:
                    published = datetime.strptime(entry.published, DATETIME_FORMAT)
                    date = published.strftime(DATE_FORMAT)
                    time = published.strftime("%H:%M:%S")

                    combined_text = f"{entry.title} {entry.summary}"
                    sentiment_score = self.sentiment_analyzer.analyze_sentiment(combined_text)

                    self.db_manager.store_score(
                        date=date,
                        time=time,
                        title=entry.title,
                        summary=entry.summary,
                        score=sentiment_score
                    )

                    self.processed_titles.add(entry.title)
                    new_entries_count += 1

                    logging.info(f"Processed new article: {entry.title[:50]}...")

                except Exception as e:
                    logging.error(f"Error processing entry: {e}")
                    skipped_entries_count += 1
                    continue

            logging.info(f"Added {new_entries_count} new articles, skipped {skipped_entries_count} invalid/duplicate entries")

        except Exception as e:
            logging.error(f"Error fetching news: {e}")
            raise

# Modified run_analysis function
def run_analysis():
    """
    Orchestrates the entire analysis process by initializing components,
    fetching and processing news articles, generating visualizations,
    and performing headline analysis. Ensures that all resources are properly closed.
    """
    try:
        logging.info("Starting analysis...")

        db_manager = DatabaseManager(DATABASE_NAME)
        sentiment_analyzer = SentimentAnalyzer()
        news_fetcher = NewsFetcher(db_manager, sentiment_analyzer)

        logging.info("Fetching and processing news...")
        news_fetcher.fetch_and_process_news()

        logging.info("Generating main dashboard...")
        DataVisualizer.create_visualizations(DATABASE_NAME)

        logging.info("Analyzing headlines...")
        analyze_headlines()

        logging.info("Analysis complete!")

    except Exception as e:
        logging.error(f"Error during analysis: {e}")
        raise
    finally:
        if 'db_manager' in locals():
            db_manager.close()

def analyze_headlines():
    """
    Analyzes and displays recent headlines along with their sentiment scores.
    Retrieves statistical data and extreme sentiment headlines, presenting them in a structured dashboard.
    """
    try:
        db_manager = DatabaseManager(DATABASE_NAME)

        stats = db_manager.get_headline_stats()

        recent_headlines = db_manager.get_headlines_with_scores(limit=10)
        most_positive, most_negative = db_manager.get_extreme_sentiment_headlines()

        fig = make_subplots(
            rows=3, cols=1,
            subplot_titles=(
                'Recent Headlines with Sentiment Scores',
                'Most Positive Headlines',
                'Most Negative Headlines'
            ),
            specs=[[{"type": "table"}],
                   [{"type": "table"}],
                   [{"type": "table"}]],
            vertical_spacing=0.1
        )

        fig.add_annotation(
            text=(f"Total Headlines: {stats.get('total_headlines', 'N/A')} | "
                  f"Average Sentiment: {stats.get('average_sentiment', 0):.2f} | "
                  f"Date Range: {stats.get('date_range', 'N/A')}"),
            xref="paper", yref="paper",
            x=0, y=1.1,
            showarrow=False,
            font=dict(size=12)
        )

        fig.add_trace(
            go.Table(
                header=dict(
                    values=['Date', 'Time', 'Headline', 'Sentiment Score'],
                    fill_color='paleturquoise',
                    align='left',
                    font=dict(size=12)
                ),
                cells=dict(
                    values=[recent_headlines['date'],
                           recent_headlines['time'],
                           recent_headlines['title'],
                           recent_headlines['score'].round(2)],
                    fill_color='lavender',
                    align='left',
                    font=dict(size=11)
                )
            ),
            row=1, col=1
        )

        fig.add_trace(
            go.Table(
                header=dict(
                    values=['Date', 'Time', 'Headline', 'Sentiment Score'],
                    fill_color='lightgreen',
                    align='left'
                ),
                cells=dict(
                    values=[most_positive['date'],
                           most_positive['time'],
                           most_positive['title'],
                           most_positive['score'].round(2)],
                    fill_color='honeydew',
                    align='left'
                )
            ),
            row=2, col=1
        )

        fig.add_trace(
            go.Table(
                header=dict(
                    values=['Date', 'Time', 'Headline', 'Sentiment Score'],
                    fill_color='lightcoral',
                    align='left'
                ),
                cells=dict(
                    values=[most_negative['date'],
                           most_negative['time'],
                           most_negative['title'],
                           most_negative['score'].round(2)],
                    fill_color='mistyrose',
                    align='left'
                )
            ),
            row=3, col=1
        )

        fig.update_layout(
            height=1000,
            title_text="Headlines Analysis Dashboard",
            showlegend=False
        )

        fig.show()

    except Exception as e:
        logging.error(f"Error analyzing headlines: {e}")
        raise
    finally:
        db_manager.close()


if __name__ == "__main__":
    run_analysis()

In [3]:
import sqlite3
from difflib import SequenceMatcher
import pandas as pd
from typing import List, Tuple
import logging

def initialize_database(db_path: str = '/content/news_sentiment.db') -> None:
    """
    Create the database and required tables if they don't exist.

    This function initializes the SQLite database by creating the 'sentiment_scores' table
    with the necessary columns if it doesn't already exist. It also creates indexes on
    'date', 'title', and 'score' columns to optimize query performance.

    Parameters:
        db_path (str): The file path to the SQLite database. Defaults to '/content/news_sentiment.db'.

    Raises:
        Exception: If an error occurs while initializing the database.
    """
    try:
        with sqlite3.connect(db_path) as conn:
            conn.execute('''CREATE TABLE IF NOT EXISTS sentiment_scores (
                                date TEXT,
                                time TEXT,
                                title TEXT,
                                summary TEXT,
                                score REAL
                            )''')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON sentiment_scores(date)')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_title ON sentiment_scores(title)')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_score ON sentiment_scores(score)')

        logging.info(f"Database initialized successfully at {db_path}")
    except Exception as e:
        logging.error(f"Error initializing database: {e}")
        raise

def calculate_similarity(text1: str, text2: str) -> float:
    """
    Calculate similarity ratio between two texts using SequenceMatcher.

    This function computes the similarity ratio between two input strings by utilizing
    the SequenceMatcher from the difflib library. The comparison is case-insensitive.

    Parameters:
        text1 (str): The first text string.
        text2 (str): The second text string.

    Returns:
        float: A value between 0 and 1 representing the similarity ratio.
    """
    return SequenceMatcher(None, text1.lower(), text2.lower()).ratio()

def find_similar_entries(conn: sqlite3.Connection, similarity_threshold: float = 0.85) -> List[Tuple[int, int, float]]:
    """
    Find pairs of similar entries in the database.

    This function retrieves all records from the 'sentiment_scores' table and compares
    each entry's title and summary with others to find pairs that exceed the specified
    similarity threshold. To optimize performance, it limits the number of comparisons
    per entry using a sliding window.

    Parameters:
        conn (sqlite3.Connection): An active SQLite database connection.
        similarity_threshold (float): The threshold above which two entries are considered similar. Defaults to 0.85.

    Returns:
        List[Tuple[int, int, float]]: A list of tuples, each containing the rowids of the two similar entries and their average similarity score.
    """
    df = pd.read_sql_query("SELECT rowid, title, summary FROM sentiment_scores", conn)
    similar_pairs = []

    for i in range(len(df)):
        for j in range(i + 1, min(i + 50, len(df))):
            title_similarity = calculate_similarity(df.iloc[i]['title'], df.iloc[j]['title'])
            if title_similarity > similarity_threshold:
                summary_similarity = calculate_similarity(df.iloc[i]['summary'] or '', df.iloc[j]['summary'] or '')
                if summary_similarity > similarity_threshold:
                    similar_pairs.append((df.iloc[i]['rowid'], df.iloc[j]['rowid'], (title_similarity + summary_similarity) / 2))
    return similar_pairs

def remove_duplicates(conn: sqlite3.Connection, similarity_threshold: float = 0.85) -> None:
    """
    Remove duplicate or very similar entries from the database.

    This function identifies similar entries in the 'sentiment_scores' table based on
    the similarity threshold and removes the later entries to eliminate duplicates.

    Parameters:
        conn (sqlite3.Connection): An active SQLite database connection.
        similarity_threshold (float): The threshold above which two entries are considered similar. Defaults to 0.85.

    Returns:
        None
    """
    similar_pairs = find_similar_entries(conn, similarity_threshold)
    if not similar_pairs:
        print("No duplicate or similar entries found.")
        return

    entries_to_remove = set()
    for _, later_entry, _ in similar_pairs:
        entries_to_remove.add(later_entry)

    cursor = conn.cursor()
    cursor.execute("DELETE FROM sentiment_scores WHERE rowid IN ({})".format(','.join('?' * len(entries_to_remove))),
                   tuple(entries_to_remove))

    print(f"Removed {len(entries_to_remove)} duplicate/similar entries.")
    print(f"Remaining entries: {cursor.execute('SELECT COUNT(*) FROM sentiment_scores').fetchone()[0]}")

    conn.commit()

def setup_and_clean_database(db_path: str = '/content/news_sentiment.db'):
    """
    Initialize database if it doesn't exist and clean up duplicates.

    This function ensures that the database is properly initialized by creating the necessary
    tables and indexes. It then proceeds to remove any duplicate or very similar entries
    to maintain data integrity.

    Parameters:
        db_path (str): The file path to the SQLite database. Defaults to '/content/news_sentiment.db'.

    Returns:
        None
    """
    initialize_database(db_path)
    with sqlite3.connect(db_path) as conn:
        remove_duplicates(conn)

if __name__ == "__main__":
    """
    The main entry point of the script.

    Initializes logging configurations and executes the database setup and cleaning process.
    """
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    setup_and_clean_database()

No duplicate or similar entries found.


In [4]:
# cell 4
import sqlite3

def find_shortest_entries(db_path: str = '/content/news_sentiment.db', num_entries: int = 5) -> None:
    """
    Find and print the entries with the smallest lengths in the database.

    This function connects to the specified SQLite database and retrieves the top `num_entries`
    from the `sentiment_scores` table based on the smallest lengths of the `title` and `summary`.
    It orders the entries in ascending order of `title_length` and `summary_length` and prints
    the truncated title and summary along with their respective lengths.

    Parameters:
        db_path (str): The file path to the SQLite database. Defaults to '/content/news_sentiment.db'.
        num_entries (int): The number of shortest entries to retrieve and print. Defaults to 5.

    Returns:
        None
    """
    try:
        with sqlite3.connect(db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("""
                SELECT title, summary, LENGTH(title) AS title_length, LENGTH(summary) AS summary_length
                FROM sentiment_scores
                ORDER BY title_length ASC, summary_length ASC
                LIMIT ?
            """, (num_entries,))

            rows = cursor.fetchall()
            for row in rows:
                title, summary, title_length, summary_length = row
                print(f"Title: {title[:50]}..., Length: {title_length}")
                print(f"Summary: {summary[:50]}..., Length: {summary_length}")
                print("-" * 20)
    except Exception as e:
        print(f"Error finding shortest entries: {e}")


if __name__ == "__main__":
    find_shortest_entries()

Title: Vijay cannot be ignored..., Length: 23
Summary: His speech has drawn admiration and criticism in e..., Length: 62
--------------------
Title: In the heart of the jungle..., Length: 26
Summary: At the Agumbe Rainforest Research Station, Taran d..., Length: 107
--------------------
Title: Fighting high-stakes bypolls..., Length: 28
Summary: Wayanad, Palakkad, and Chelakkara will all witness..., Length: 75
--------------------
Title: Daily Quiz | On the Ballon d’ Or..., Length: 32
Summary: Spanish footballers Rodri and Aitana Bonmati won t..., Length: 160
--------------------
Title: Hanging on to the American Dream..., Length: 32
Summary: The Indian American community is weighing its opti..., Length: 177
--------------------
