# extract news from the Source

In [1]:

import datetime
import os
import ast
import logging
from typing import List, Dict, Any, Optional, Union

import pandas as pd
import numpy as np
import requests
from dotenv import load_dotenv
from newsapi import NewsApiClient
from bs4 import BeautifulSoup
from tqdm import tqdm
from transformers import pipeline 

from google.cloud import bigquery
from google.oauth2 import service_account
from dotenv import load_dotenv

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# extract_data function : 

def extract_data():
    """
    Extract news articles from NewsAPI for multiple topics and combine them.
    
    Returns:
        list: Combined list of articles from different topics
    """
    import datetime 
    import os 
    from dotenv import load_dotenv
    from newsapi import NewsApiClient 
    
    # Get current date and date from 7 days ago
    current_date = datetime.datetime.now().strftime('%Y-%m-%d')
    print("Current date:", current_date)
    
    seven_days_ago = (datetime.datetime.now() - datetime.timedelta(days=7)).strftime('%Y-%m-%d')
    print("7 days ago:", seven_days_ago)
    
    # Load API key from .env file
    load_dotenv()
    news_api = os.getenv("NEWS_API")
    
    # Initialize NewsAPI client
    newsapi = NewsApiClient(news_api)
    
    # Define topics to search for
    topics = ['GenAI', 'AI', 'Technology']
    combined_articles = []
    
    # Fetch articles for each topic
    for topic in topics:
        articles = newsapi.get_everything(
            q=topic,
            from_param=seven_days_ago,
            to=current_date,
            language='en',
            sort_by='relevancy',
            page=2
        )
        
        print(f"Fetched {len(articles['articles'])} articles for topic: {topic}")
        combined_articles.extend(articles['articles'])
    
    # Remove duplicate articles (same URL)
    seen_urls = set()
    unique_articles = []
    
    for article in combined_articles:
        if article['url'] not in seen_urls:
            seen_urls.add(article['url'])
            unique_articles.append(article)
    
    print(f"Total unique articles fetched: {len(unique_articles)}")
    return unique_articles


# Transform the data extracted from source 

In [3]:
import pandas as pd
import ast
import requests
from bs4 import BeautifulSoup

def extract_source_name(source):
    """
    Extract the name from a source object which can be either a dictionary or string.
    
    Args:
        source: Source object from NewsAPI (can be dict or string)
        
    Returns:
        str: Extracted source name or 'Unknown' if not found
    """
    if isinstance(source, dict):
        return source.get('name', 'Unknown')
    elif isinstance(source, str):
        try:
            dict_data = ast.literal_eval(source)  # Attempt to parse it as a dictionary
            if isinstance(dict_data, dict):
                return dict_data.get('name', 'Unknown')
        except (SyntaxError, ValueError):
            # If source is a plain string, return it directly
            return source
    return 'Unknown'

def extract_content(url):
    """
    Extract the main content from a webpage given its URL.
    
    Args:
        url (str): URL of the article
        
    Returns:
        str: Extracted text content or error message
    """
    try:
        headers = {"User-Agent": "Mozilla/5.0"}  # Mimic a browser request
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()  # Raise an error for bad responses
        
        soup = BeautifulSoup(response.text, "html.parser")
        
        # Extract meaningful text (modify based on website structure)
        paragraphs = soup.find_all("p")
        content = " ".join(p.text for p in paragraphs)
        
        return content[:1000]  # Return first 1000 characters to avoid large data
    except Exception as e:
        return f"Error: {e}"

def transform_data(combined_articles):
    """
    Transform the combined articles into a cleaned and structured DataFrame.
    
    Args:
        combined_articles (list): List of article dictionaries from NewsAPI
        
    Returns:
        pandas.DataFrame: Transformed and cleaned DataFrame
    """
    # Convert to DataFrame
    combined_articles_df = pd.DataFrame(combined_articles)
    
    # Drop the urlToImage column
    combined_articles_df = combined_articles_df.drop('urlToImage', axis=1)
    
    # Remove duplicates based on description
    final_df = combined_articles_df.drop_duplicates(subset=["description"], keep='first')
    
    # Format dates
    final_df['publishedAt'] = pd.to_datetime(final_df['publishedAt']).dt.strftime('%Y-%m-%d')
    
    # Apply source name extraction
    final_df.loc[:, 'source'] = final_df['source'].apply(extract_source_name)
    
    # Extract full content from each article URL
    final_df['full_content'] = final_df['url'].apply(extract_content) 
    final_df['publishedAt'] = pd.to_datetime(final_df['publishedAt'])
    
    return final_df

In [4]:

def analyze_sentiment(df: pd.DataFrame, text_column: str = 'full_content') -> pd.DataFrame:
    """
    Apply sentiment analysis to the specified text column in the DataFrame.
    
    Args:
        df: DataFrame containing the text to analyze
        text_column: Column name containing the text to analyze
        
    Returns:
        pd.DataFrame: DataFrame with sentiment analysis results added
        
    Raises:
        ValueError: If the text column doesn't exist
    """
    print("Initializing sentiment analysis pipeline...")
    
    if text_column not in df.columns:
        raise ValueError(f"Text column '{text_column}' not found in DataFrame")
    
    # Initialize sentiment analysis pipeline
    try:
        sentiment_pipeline = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
        print("Sentiment analysis pipeline initialized successfully")
    except Exception as e:
        print(f"Error initializing sentiment analysis pipeline: {str(e)}")
        raise
    # Apply sentiment analysis in batches to avoid memory issues
    print(f"Applying sentiment analysis to {len(df)} articles...")
    batch_size = 32
    results = []
    
    # Process in batches with progress bar
    for i in tqdm(range(0, len(df), batch_size)):
        batch = df[text_column].iloc[i:i+batch_size].fillna("").tolist()
        # Filter out empty strings
        valid_texts = []
        valid_indices = []
        
        for j, text in enumerate(batch):
            if isinstance(text, str) and text.strip():
                valid_texts.append(text)
                valid_indices.append(j)
        
        if valid_texts:
            try:
                batch_results = sentiment_pipeline(valid_texts)
                
                # Place results in the correct positions
                for idx, result in zip(valid_indices, batch_results):
                    while len(results) < i + idx:
                        results.append({"label": "NEUTRAL", "score": 0.5})
                    results.append(result)
            except Exception as e:
                print(f"Error in batch sentiment analysis: {str(e)}")
                # Add neutral results for this batch
                results.extend([{"label": "NEUTRAL", "score": 0.5} for _ in valid_texts])
        
        # Fill in missing results for empty strings
        while len(results) < min(i + batch_size, len(df)):
            results.append({"label": "NEUTRAL", "score": 0.5})
    
    # Ensure we have exactly the right number of results
    if len(results) != len(df):
        print(f"Results length mismatch: {len(results)} vs {len(df)}. Adjusting...")
        if len(results) < len(df):
            results.extend([{"label": "NEUTRAL", "score": 0.5} for _ in range(len(df) - len(results))])
        else:
            results = results[:len(df)]
    
    # Add sentiment results to the dataframe
    df.loc[:, 'sentiment_label'] = [result['label'] for result in results]
    df.loc[:, 'sentiment_score'] = [result['score'] for result in results]
    df.loc[:, 'sentiment_value'] = df['sentiment_label'].map({'POSITIVE': 1, 'NEGATIVE': -1, 'NEUTRAL': 0})
    
    print("Sentiment analysis completed successfully")
    return df

# Load to the big query function :

In [5]:
def load_data_to_bigquery(dataframe, service_account_path='./service_account.json', method='replace'):
    """
    Load a pandas DataFrame to BigQuery.
    
    Parameters:
    -----------
    dataframe : pandas.DataFrame
        The DataFrame to be loaded to BigQuery
    service_account_path : str, default='./service_account.json'
        Path to the Google Cloud service account JSON file
    method : str, default='replace'
        What to do if the table exists. Options: 'fail', 'replace', or 'append'
        
    Returns:
    --------
    int
        Number of rows loaded to BigQuery
    """
    # Load environment variables
    load_dotenv()
    
    # Get BigQuery project, dataset, and table details from environment variables
    project_id = os.getenv("project_id")
    dataset_id = os.getenv("dataset_id")
    table_id = os.getenv("table_id")
    
    if not all([project_id, dataset_id, table_id]):
        raise ValueError("Missing environment variables. Make sure project_id, dataset_id, and table_id are set.")
    
    # Full table reference
    table_ref = f"{dataset_id}.{table_id}"
    
    # Set up credentials
    credentials = service_account.Credentials.from_service_account_file(service_account_path)
    
    # Check if publishedAt is in datetime format
    if 'publishedAt' in dataframe.columns and dataframe['publishedAt'].dtype == 'object':
        dataframe['publishedAt'] = pd.to_datetime(dataframe['publishedAt'])
    
    # Upload to BigQuery with error handling
    try:
        dataframe.to_gbq(
            destination_table=table_ref,
            project_id=project_id,
            if_exists=method,
            credentials=credentials
        )
        print(f"Successfully loaded {len(dataframe)} rows to {table_ref}")
        return len(dataframe)
    except Exception as e:
        print(f"Error loading data to BigQuery: {str(e)}")
        raise

# function call : 


In [6]:
articles = extract_data() 

# Transform data
transformed_df = transform_data(articles) 

# apply anlp for sentiment analysis 
results_df = analyze_sentiment(transformed_df) 




Current date: 2025-04-14
7 days ago: 2025-04-07
Fetched 100 articles for topic: GenAI
Fetched 98 articles for topic: AI
Fetched 100 articles for topic: Technology
Total unique articles fetched: 290


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['publishedAt'] = pd.to_datetime(final_df['publishedAt']).dt.strftime('%Y-%m-%d')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['full_content'] = final_df['url'].apply(extract_content)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['publishedAt'] = pd.to_datetime(final_df

Initializing sentiment analysis pipeline...


Device set to use mps:0


Sentiment analysis pipeline initialized successfully
Applying sentiment analysis to 281 articles...


100%|██████████| 9/9 [00:10<00:00,  1.16s/it]

Sentiment analysis completed successfully





TypeError: load_data_to_bigquery() got an unexpected keyword argument 'df'

# continue changing 

## code to write results to the big query 

In [7]:
#  Authenticating from big query 
from google.cloud import bigquery
import os 

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "service_account.json"
# Initialize BigQuery client
client = bigquery.Client()

In [8]:


load_dotenv()
project_id = os.getenv("project_id")
dataset_id = os.getenv("dataset_id")
table_id = os.getenv("table_id")
table_ref = f"{project_id}.{dataset_id}.{table_id}"



In [32]:
query = f"""
    SELECT  *
    FROM `{table_ref}`
    ORDER BY publishedAt DESC;
"""

In [33]:
# Run the query
query_job = client.query(query)

# Fetch results
results_gcp = query_job.result()

In [None]:
results_gcp

In [None]:
import pandas as pd

results__read_gcp_df =results_gcp.to_dataframe()

In [None]:

results__read_gcp_df

In [None]:
# from google.oauth2 import service_account
# credentials = service_account.Credentials.from_service_account_file(
#     './service_account.json'
# )

In [None]:
# credentials

In [23]:
# results_df['publishedAt'] = pd.to_datetime(results_df['publishedAt'])

In [None]:
# results_df.to_gbq(
#     destination_table='news_dataset.news_articles',  # Replace with your dataset and desired table name
#     project_id=project_id,  # Replace with your actual project ID
#     if_exists='append',  # Change to 'append' if you want to add to existing data
#     credentials=credentials
# )

In [None]:
# print(results_df['publishedAt'].dtype)
# print(results_df['publishedAt'].head())
# # Check for null values
# print(results_df['publishedAt'].isna().sum())