In [5]:
import sqlite3
from datetime import datetime, timezone, date # Added date
import time
import pandas as pd # For potential display, though not strictly needed for pipeline logic

# --- Essential Config variables (copied/adapted from Notebook 01) ---
DATABASE_NAME = "trend_analyzer.db"
NEWS_RSS_FEEDS = { # Use your working feeds
    "Guardian Environment": "https://www.theguardian.com/environment/rss",
    "Ars Technica": "http://feeds.arstechnica.com/arstechnica/index/" # Example, replace with relevant ones
}
REDDIT_SUBREDDITS = {
    "RenewableEnergySub": "RenewableEnergy"
}
REDDIT_POST_LIMIT = 5 # Keep small for pipeline testing in notebook

print(f"--- Notebook 4: Data Storage and Pipeline ---")
print(f"Current time: {pd.Timestamp.now(tz='Asia/Kolkata')}")
print(f"Using Database: {DATABASE_NAME}")

# --- Database Manager Functions (copied/adapted from Notebook 01 & previous cells) ---
def create_connection():
    conn = None
    try:
        conn = sqlite3.connect(DATABASE_NAME)
    except sqlite3.Error as e:
        print(f"DB Connection Error: {e}")
    return conn

def insert_article_data_pipeline(conn, article_list):
    """Inserts new articles and returns a map of {url: new_article_id} for successfully inserted articles."""
    inserted_articles_map = {} # url -> new_id
    if not conn or not article_list:
        return inserted_articles_map

    try:
        cursor = conn.cursor()
        # Get existing URLs to avoid duplicates
        cursor.execute("SELECT source_url FROM articles")
        existing_urls = {row[0] for row in cursor.fetchall()}

        for article in article_list:
            if article['url'] not in existing_urls:
                try:
                    cursor.execute("""
                        INSERT INTO articles (source_url, source_name, title, raw_content, publication_date)
                        VALUES (?, ?, ?, ?, ?)
                    """, (article['url'], article['source_name'], article['title'],
                          article['full_content'], article.get('publication_date')))
                    inserted_articles_map[article['url']] = cursor.lastrowid
                except sqlite3.IntegrityError:
                    # This case should ideally be prevented by the 'not in existing_urls' check
                    # print(f"IntegrityError (should be rare): Article URL {article['url']} already exists.")
                    pass # Skip if somehow it still happens
                except Exception as e:
                    print(f"Error inserting article {article['url']}: {e}")
        conn.commit()
    except sqlite3.Error as e:
        print(f"DB error during bulk article insertion: {e}")
    return inserted_articles_map

def insert_nlp_data_pipeline(conn, article_id, sentiment, keywords, entities):
    """Inserts NLP results for a given article_id."""
    if not conn:
        return False
    success = False
    try:
        cursor = conn.cursor()
        # Insert sentiment
        cursor.execute("""
            INSERT INTO sentiments (article_id, sentiment_score, sentiment_label)
            VALUES (?, ?, ?)
        """, (article_id, sentiment['score'], sentiment['label']))

        # Insert keywords
        for keyword, score in keywords.items(): # keywords is a dict
            cursor.execute("""
                INSERT INTO keywords (article_id, keyword, score) VALUES (?, ?, ?)
            """, (article_id, keyword, score))

        # Insert entities
        for entity_text, entity_label in entities.items(): # entities is a dict
            cursor.execute("""
                INSERT INTO entities (article_id, entity_text, entity_label) VALUES (?, ?, ?)
            """, (article_id, entity_text, entity_label))
        conn.commit()
        success = True
    except sqlite3.Error as e:
        # print(f"Error inserting NLP data for article_id {article_id}: {e}")
        # Allow pipeline to continue for other articles if one fails NLP insertion
        pass
    return success

# --- Scraper Functions (copied/adapted from Notebook 02) ---
import requests
import feedparser
from newspaper import Article as NewspaperArticle, ArticleException

def parse_datetime(date_string): # Copied from Notebook 2, Cell 2
    if not date_string: return None
    common_formats = ['%a, %d %b %Y %H:%M:%S %z', '%a, %d %b %Y %H:%M:%S %Z', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%dT%H:%M:%S.%f%z', '%Y-%m-%d %H:%M:%S']
    dt_object = None
    if isinstance(date_string, (int, float)):
        try: return datetime.fromtimestamp(date_string, timezone.utc)
        except: pass
    if isinstance(date_string, str):
        for fmt in common_formats:
            try:
                dt_object = datetime.strptime(date_string, fmt)
                if dt_object.tzinfo is None or dt_object.tzinfo.utcoffset(dt_object) is None: dt_object = dt_object.replace(tzinfo=timezone.utc)
                return dt_object
            except ValueError: continue
    elif hasattr(date_string, 'tm_year'): return datetime.fromtimestamp(time.mktime(date_string), timezone.utc)
    return datetime.now(timezone.utc)

def fetch_article_content(url): # Copied from Notebook 2, Cell 3 (corrected)
    if not url or not url.startswith(('http://', 'https://')): return None, None
    try:
        article_obj = NewspaperArticle(url, fetch_images=False, memoize_articles=False)
        article_obj.download(); article_obj.parse()
        text = article_obj.text; pub_date = article_obj.publish_date
        if pub_date and (pub_date.tzinfo is None or pub_date.tzinfo.utcoffset(pub_date) is None): pub_date = pub_date.replace(tzinfo=timezone.utc)
        return text, pub_date
    except: return None, None # Simplified error handling for brevity here

def scrape_rss_feed_pipeline(source_name, rss_url): # Adapted from Notebook 2, Cell 4 (corrected)
    articles = []; feed_data = None
    headers = {'User-Agent': 'Mozilla/5.0 (compatible; TrendAnalyzerBot-Pipeline/0.1)', 'Accept': 'application/xml,text/xml,*/*'}
    try:
        response = requests.get(rss_url, headers=headers, timeout=15); response.raise_for_status()
        feed_content = response.content; feed_data = feedparser.parse(feed_content)
        entries_to_process = feed_data.entries[:REDDIT_POST_LIMIT] # Using global limit for consistency
        for entry in entries_to_process:
            title = entry.get('title'); url = entry.get('link')
            pub_date_parsed = entry.get('published_parsed') or entry.get('updated_parsed')
            pub_date_feed = parse_datetime(pub_date_parsed) if pub_date_parsed else None
            if not (title and url): continue
            full_content, pub_date_article = fetch_article_content(url)
            final_pub_date = pub_date_feed if pub_date_feed else pub_date_article if pub_date_article else datetime.now(timezone.utc)
            content_to_store = full_content if full_content and full_content.strip() else f"Content not retrievable - {title}"
            articles.append({'title': title, 'url': url, 'publication_date': final_pub_date, 'full_content': content_to_store, 'source_name': source_name})
            time.sleep(0.1) # Be polite
    except Exception as e: print(f"Error scraping RSS {source_name} ({rss_url}): {e}")
    return articles

def scrape_reddit_forum_pipeline(source_name_prefix, subreddit_name, limit): # Adapted from Notebook 2, Cell 5
    articles = []; headers = {'User-agent': f'Mozilla/5.0 (compatible; {source_name_prefix}_TrendAnalyzerBot-Pipeline/0.1)'}
    try:
        url = f"https://www.reddit.com/r/{subreddit_name}/new.json?limit={limit}"; response = requests.get(url, headers=headers, timeout=15); response.raise_for_status(); data = response.json()
        if 'data' not in data or 'children' not in data['data']: return articles
        for post in data['data']['children']:
            post_data = post['data']; title = post_data.get('title'); permalink = post_data.get('permalink')
            if not (title and permalink): continue
            full_url = f"https://www.reddit.com{permalink}"; created_utc = post_data.get('created_utc'); pub_date = parse_datetime(created_utc)
            content = ""; source_display_name = f"{source_name_prefix} r/{subreddit_name}"
            if post_data.get('is_self', False): content = post_data.get('selftext', '')
            elif 'url_overridden_by_dest' in post_data:
                external_url = post_data['url_overridden_by_dest']; fetched_content, _ = fetch_article_content(external_url)
                content = fetched_content if fetched_content else title
            else: content = title
            content_to_store = content if content and content.strip() else f"Content not retrievable or empty - {title}"
            articles.append({'title': title, 'url': full_url, 'publication_date': pub_date, 'full_content': content_to_store, 'source_name': source_display_name})
            time.sleep(0.2) # Be polite
    except Exception as e: print(f"Error scraping Reddit r/{subreddit_name}: {e}")
    return articles

# --- NLP Processor Functions (copied/adapted from Notebook 03) ---
import spacy
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from sklearn.feature_extraction.text import TfidfVectorizer

try: nlp_spacy_model = spacy.load("en_core_web_sm")
except OSError: nlp_spacy_model = None; print("spaCy model 'en_core_web_sm' not loaded for pipeline.")
try: vader_model = SentimentIntensityAnalyzer()
except Exception: vader_model = None; print("VADER model not initialized for pipeline.")

def get_sentiment_pipeline(text):
    if not text or not isinstance(text, str) or not vader_model: return {'score': 0.0, 'label': 'neutral'}
    vs = vader_model.polarity_scores(text); score = vs['compound']
    if score >= 0.05: label = 'positive'
    elif score <= -0.05: label = 'negative'
    else: label = 'neutral'
    return {'score': score, 'label': label}

def get_entities_pipeline(text):
    if not text or not isinstance(text, str) or not nlp_spacy_model: return {}
    doc = nlp_spacy_model(text[:nlp_spacy_model.max_length]); entities = {}
    if doc.ents:
        for ent in doc.ents: entities[ent.text.strip()] = ent.label_
    return entities

def get_keywords_tfidf_pipeline(text_list, num_keywords=10):
    if not text_list or not all(isinstance(s, str) and s.strip() for s in text_list if s): return {}
    valid_texts = [s for s in text_list if s and s.strip()];
    if not valid_texts: return {}
    try:
        stop_words = list(nlp_spacy_model.Defaults.stop_words) if nlp_spacy_model else 'english'
        vectorizer = TfidfVectorizer(stop_words=stop_words, max_features=1000, ngram_range=(1,2), token_pattern=r'(?u)\b\w[\w-]*\w\b|\b\w\w+\b')
        tfidf_matrix = vectorizer.fit_transform(valid_texts); feature_names = vectorizer.get_feature_names_out()
        doc_vector = tfidf_matrix[0]; tfidf_scores_dict = {}
        for col_idx in doc_vector.nonzero()[1]: tfidf_scores_dict[feature_names[col_idx]] = doc_vector[0, col_idx]
        return dict(sorted(tfidf_scores_dict.items(), key=lambda item: item[1], reverse=True)[:num_keywords])
    except: return {} # Simplified error handling

def process_text_content_pipeline(text_content):
    if not text_content or not isinstance(text_content, str) or not text_content.strip():
        return {'score': 0.0, 'label': 'neutral'}, {}, {}
    if not nlp_spacy_model or not vader_model:
        # print("NLP models not available in process_text_content_pipeline")
        return {'score': 0.0, 'label': 'neutral'}, {}, {}
    sentiment = get_sentiment_pipeline(text_content)
    entities = get_entities_pipeline(text_content)
    keywords = get_keywords_tfidf_pipeline([text_content])
    return sentiment, keywords, entities

print("\nAll necessary functions and configurations loaded/redefined for Notebook 4 pipeline.")
if not nlp_spacy_model or not vader_model:
    print("WARNING: One or both NLP models (spaCy, VADER) did not load. NLP processing will be skipped or return defaults.")

--- Notebook 4: Data Storage and Pipeline ---
Current time: 2025-05-23 09:45:14.831961+05:30
Using Database: trend_analyzer.db

All necessary functions and configurations loaded/redefined for Notebook 4 pipeline.


In [7]:
def run_full_pipeline_notebook():
    print(f"\n--- Starting full data pipeline at {pd.Timestamp.now(tz='Asia/Kolkata')} ---")
    start_time_total = time.time()

    # Ensure database tables exist (idempotent check)
    # In a script, you might do this once at the beginning. In a notebook, it's fine to re-check.
    # create_tables() # This function is from Notebook 1, not redefined in Cell 1 of this notebook.
    # For this notebook to be self-contained for pipeline run, let's ensure tables if not done.
    # We'll rely on Notebook 1 having been run for table creation. If not, an error will occur on insert.
    # Alternatively, copy create_tables() into Cell 1 of this notebook too.
    # For now, assuming tables exist from Notebook 1.

    # 1. Scrape data
    print("\n--- Step 1: Scraping Data ---")
    scraped_articles_list = []
    if NEWS_RSS_FEEDS:
        for name, url in NEWS_RSS_FEEDS.items():
            print(f"  Scraping RSS: {name}...")
            scraped_articles_list.extend(scrape_rss_feed_pipeline(name, url))
    if REDDIT_SUBREDDITS:
        for key, sub_name in REDDIT_SUBREDDITS.items():
            print(f"  Scraping Reddit: r/{sub_name}...")
            scraped_articles_list.extend(scrape_reddit_forum_pipeline(key, sub_name, limit=REDDIT_POST_LIMIT))
    
    if not scraped_articles_list:
        print("No articles/posts scraped in this run. Pipeline ending.")
        return
    print(f"Total {len(scraped_articles_list)} articles/posts scraped from all sources.")
    # display(pd.DataFrame(scraped_articles_list).head(2)) # Optional: display a few scraped items

    # 2. Insert new articles into database and get their IDs
    print("\n--- Step 2: Inserting New Articles into Database ---")
    db_conn_pipeline = create_connection()
    if not db_conn_pipeline:
        print("Failed to connect to database. Cannot proceed with article insertion.")
        return
        
    # insert_article_data_pipeline now handles checking for existing URLs internally
    newly_inserted_articles_map = insert_article_data_pipeline(db_conn_pipeline, scraped_articles_list)
    # No need to close connection here if insert_article_data_pipeline doesn't close it.
    # However, the version I provided for insert_article_data_pipeline in Cell 1 *does* close it.
    # So, we need to reopen for NLP data insertion, or modify insert_article_data_pipeline.
    # Let's modify for clarity: insert_article_data_pipeline will keep connection open IF passed.
    
    # Re-architecting slightly for better connection management:
    # The insert_article_data_pipeline and insert_nlp_data_pipeline in Cell 1
    # already manage their own connections. This is fine for simplicity here.

    if not newly_inserted_articles_map:
        print("No new unique articles were inserted into the database during this run.")
        # We might still want to run analytics for the day, so don't necessarily exit pipeline.
    else:
        print(f"{len(newly_inserted_articles_map)} new unique articles were inserted into the database.")

    # 3. Process with NLP and insert NLP data for newly inserted articles
    print("\n--- Step 3: Processing NLP and Storing Results for New Articles ---")
    processed_nlp_count = 0
    if newly_inserted_articles_map and (nlp_spacy_model and vader_model): # Check if NLP models are loaded
        # We need the original article content for those that were newly inserted.
        # Iterate through the original scraped list, and if its URL is in our map of new ones, process it.
        for original_article in scraped_articles_list:
            article_url = original_article['url']
            if article_url in newly_inserted_articles_map:
                new_article_id = newly_inserted_articles_map[article_url]
                content_to_process = original_article.get('full_content', '')
                
                # print(f"  NLP for new Article ID: {new_article_id} - Title: {original_article['title'][:30]}...")
                if content_to_process and content_to_process.strip():
                    sentiment, keywords, entities = process_text_content_pipeline(content_to_process)
                    
                    # Re-open connection for this NLP insert batch
                    nlp_db_conn = create_connection()
                    if nlp_db_conn:
                        if insert_nlp_data_pipeline(nlp_db_conn, new_article_id, sentiment, keywords, entities):
                            processed_nlp_count += 1
                        nlp_db_conn.close() # Close after this batch of NLP inserts for the article
                    else:
                        print(f"Failed to connect to DB for NLP insert (Article ID: {new_article_id}). Skipping NLP storage for this item.")
                else:
                    # print(f"  Skipping NLP for Article ID {new_article_id} due to empty/invalid content.")
                    pass
        print(f"NLP processing and storage completed for {processed_nlp_count} new articles.")
    elif not newly_inserted_articles_map:
        print("No new articles to process with NLP.")
    else:
        print("NLP models (spaCy/VADER) not loaded. Skipping NLP processing step.")

    # 4. Calculate daily trends (This will be done in Notebook 5)
    print("\n--- Step 4: Daily Trends Calculation ---")
    print("To calculate/update daily trends based on all data (including newly added),")
    print("please proceed to and run Notebook 05_Analytics_Engine.ipynb.")

    end_time_total = time.time()
    print(f"\n--- Full data pipeline run (Scraping, NLP for new, Storing) finished in {end_time_total - start_time_total:.2f} seconds ---")

# --- Execute the Full Pipeline ---
# Before running, ensure your NEWS_RSS_FEEDS in Cell 1 of this notebook
# contains valid and relevant URLs that worked in your Notebook 2 tests.
if __name__ == '__main__' and '__file__' not in globals(): # Standard check to run code in a notebook cell
    # Ensure tables exist (call the function from Notebook 1 if you haven't)
    # For this notebook, let's assume Notebook 1's Cell 4 (create_tables) has been run.
    # If you want to be absolutely sure tables exist before this pipeline, you'd add:
    # import database_manager # (if you made it a .py file)
    # database_manager.create_tables()
    # OR copy the create_tables function into Cell 1 of this notebook and call it.
    # For now, we proceed assuming tables are ready.
    
    run_full_pipeline_notebook()


--- Starting full data pipeline at 2025-05-23 09:46:42.999598+05:30 ---

--- Step 1: Scraping Data ---
  Scraping RSS: Guardian Environment...
  Scraping RSS: Ars Technica...
  Scraping Reddit: r/RenewableEnergy...
Total 15 articles/posts scraped from all sources.

--- Step 2: Inserting New Articles into Database ---
No new unique articles were inserted into the database during this run.

--- Step 3: Processing NLP and Storing Results for New Articles ---
No new articles to process with NLP.

--- Step 4: Daily Trends Calculation ---
To calculate/update daily trends based on all data (including newly added),
please proceed to and run Notebook 05_Analytics_Engine.ipynb.

--- Full data pipeline run (Scraping, NLP for new, Storing) finished in 18.87 seconds ---
