In [None]:
import requests
import pandas as pd
import time
from datetime import datetime
from nltk.sentiment import SentimentIntensityAnalyzer
from textblob import TextBlob
from transformers import pipeline

# Replace with your NewsAPI key
api_key = 'afc3fe9ac08745439bf521cb5b974fbc'

# Initialize sentiment analysis tools
vader_analyzer = SentimentIntensityAnalyzer()
bert_sentiment = pipeline("sentiment-analysis", model="nlptown/bert-base-multilingual-uncased-sentiment")

# List of tickers to search news for
tickers = [
    'AAPL', 'GOOGL', 'MSFT', 'ASTS', 'PTON', 'GSAT', 'PLTR', 'SMR', 'ACHR',
    'BWXT', 'ARBK', 'AMD', 'NVDA', 'GME', 'MU', 'TSLA', 'NFLX', 'ZG',
    'AVGO', 'SMCI', 'GLW', 'HAL', 'LMT', 'AMZN', 'CRM', 'NOW', 'CHTR', 'TDS', 'META'
]

# Get today's date in ISO format
today = datetime.utcnow().strftime('%Y-%m-%d')

# Functions for sentiment analysis
def vader_sentiment(text):
    if text:
        return vader_analyzer.polarity_scores(text)['compound']
    return 0

def textblob_sentiment(text):
    if text:
        return TextBlob(text).sentiment.polarity
    return 0

def bert_sentiment_analysis(text):
    if text:
        result = bert_sentiment(text)[0]
        return result['label'], result['score']  # Returns sentiment label and confidence
    return "NEUTRAL", 0.0

def bert_to_vader_scale(label, confidence):
    label_to_score = {
        "1 star": -1.0,
        "2 stars": -0.5,
        "3 stars": 0.0,
        "4 stars": 0.5,
        "5 stars": 1.0
    }
    return label_to_score.get(label, 0.0) * confidence

# Function to fetch market news for the current day
def get_market_news(ticker):
    url = (
        f'https://newsapi.org/v2/everything?q={ticker}&from={today}&to={today}&sortBy=publishedAt&apiKey={api_key}'
    )
    response = requests.get(url)
    if response.status_code == 200:
        return response.json().get('articles', [])
    elif response.status_code == 429:
        print(f"Rate limit exceeded for {ticker}, retrying after delay...")
        time.sleep(5)
        return []
    else:
        print(f"Error fetching data for {ticker}: {response.status_code}")
        return []

# Save data in the required schema
def save_to_csv(news_data, filename="news_data_today.csv"):
    df = pd.DataFrame(news_data)
    df.to_csv(filename, index=False)
    print(f"Data saved to {filename}")

# Fetch and process news for all tickers
all_news = []
for ticker in tickers:
    print(f"Fetching news for {ticker}...")
    articles = get_market_news(ticker)
    
    for article in articles:
        title = article.get('title', '')
        summary = article.get('description', '')
        
        # Sentiment analysis
        headline_vader_sentiment = vader_sentiment(title)
        summary_textblob_sentiment = textblob_sentiment(summary)
        summary_vader_sentiment = vader_sentiment(summary)
        summary_bert_sentiment, bert_confidence = bert_sentiment_analysis(summary)
        summary_bert_vader_scaled = bert_to_vader_scale(summary_bert_sentiment, bert_confidence)
        
        # Article schema
        news_entry = {
            'ticker': ticker,
            'title': title,
            'headline_vader_sentiment': headline_vader_sentiment,
            'summary': summary,
            'summary_textblob_sentiment': summary_textblob_sentiment,
            'summary_vader_sentiment': summary_vader_sentiment,
            'summary_bert_sentiment': summary_bert_sentiment,
            'bert_confidence': bert_confidence,
            'summary_bert_vader_scaled': summary_bert_vader_scaled,
            'publisher': article.get('source', {}).get('name', ''),
            'link': article.get('url', ''),
            'publish_date': article.get('publishedAt', ''),
            'type': 'general',  # Default value
            'related_tickers': '',  # Default empty
            'source': 'NewsAPI',  # Identify source
        }
        all_news.append(news_entry)
    
    # Avoid rate limiting
    time.sleep(1)

# Save the formatted data to a CSV file
if all_news:
    save_to_csv(all_news)
else:
    print("No news data available.")








In [None]:
import os
import logging
from typing import Optional, Tuple, Dict, Any
from datetime import datetime, timedelta

import pandas as pd
from google.cloud import bigquery
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from transformers import pipeline


class NewsDataProcessorError(Exception):
    """Custom exception for NewsDataProcessor errors."""
    pass


class NewsDataProcessor:
    def __init__(self, project_id: str, dataset_id: str, logger: Optional[logging.Logger] = None):
        self._validate_input_parameters(project_id, dataset_id)
        self.logger = logger or self._setup_logger()
        try:
            self.client = bigquery.Client(project=project_id)
        except Exception as e:
            self.logger.error(f"Failed to initialize BigQuery client: {e}")
            raise NewsDataProcessorError(f"BigQuery client initialization failed: {e}")
        self.project_id = project_id
        self.dataset_id = dataset_id
        self.vader_analyzer = SentimentIntensityAnalyzer()
        self._bert_pipeline = None

    def _validate_input_parameters(self, project_id: str, dataset_id: str):
        if not project_id or not isinstance(project_id, str):
            raise NewsDataProcessorError("Invalid project_id. Must be a non-empty string.")
        if not dataset_id or not isinstance(dataset_id, str):
            raise NewsDataProcessorError("Invalid dataset_id. Must be a non-empty string.")

    def _setup_logger(self) -> logging.Logger:
        logger = logging.getLogger(self.__class__.__name__)
        logger.setLevel(logging.DEBUG)  # Set to DEBUG for detailed logs
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)
        return logger

    @property
    def bert_pipeline(self):
        if self._bert_pipeline is None:
            self._bert_pipeline = pipeline("sentiment-analysis")
        return self._bert_pipeline

    def calculate_vader_sentiment(self, text: Optional[str]) -> float:
        if not text or not isinstance(text, str):
            return 0.0
        try:
            sentiment = self.vader_analyzer.polarity_scores(text)
            return sentiment.get("compound", 0.0)
        except Exception as e:
            self.logger.warning(f"VADER sentiment analysis failed: {e}")
            return 0.0

    def calculate_bert_sentiment(self, text: Optional[str]) -> Tuple[float, float]:
        if not text or not isinstance(text, str):
            return 0.0, 0.0
        try:
            result = self.bert_pipeline(text)[0]
            # Map BERT sentiment to a range similar to VADER (-1 to 1)
            if result["label"] == "POSITIVE":
                # Scale positive sentiment from 0-1 to 0-1
                sentiment_score = (result["score"] * 2) - 1
            else:
                # Scale negative sentiment from 0-1 to -1-0
                sentiment_score = -((result["score"] * 2) - 1)
            
            confidence = result["score"]
            return sentiment_score, confidence
        except Exception as e:
            self.logger.warning(f"BERT sentiment analysis failed: {e}")
            return 0.0, 0.0

    def ensure_table_exists(self, table_id: str):
        table_ref = f"{self.project_id}.{self.dataset_id}.{table_id}"
        try:
            self.client.get_table(table_ref)
            self.logger.info(f"Table {table_ref} already exists.")
        except Exception:
            self.logger.info(f"Table {table_ref} does not exist. Creating it...")
            schema = [
                    bigquery.SchemaField("ticker", "STRING"),
                    bigquery.SchemaField("title", "STRING"),
                    bigquery.SchemaField("summary", "STRING"),
                    bigquery.SchemaField("publisher", "STRING"),
                    bigquery.SchemaField("link", "STRING"),
                    bigquery.SchemaField("publish_date", "TIMESTAMP"),  # Use TIMESTAMP for ISO 8601 datetime
                    bigquery.SchemaField("type", "STRING"),
                    bigquery.SchemaField("related_tickers", "STRING"),
                    bigquery.SchemaField("source", "STRING"),
                    bigquery.SchemaField("lexical_diversity", "FLOAT"),
                    bigquery.SchemaField("reliability_score", "FLOAT"),
                    bigquery.SchemaField("textblob_sentiment", "FLOAT"),
                    bigquery.SchemaField("vader_sentiment", "FLOAT"),
                    bigquery.SchemaField("bert_sentiment", "FLOAT"),
                    bigquery.SchemaField("bert_confidence", "FLOAT"),
                    bigquery.SchemaField("word_count", "INTEGER"),
                    bigquery.SchemaField("headline_sentiment", "FLOAT"),
            ]
            table = bigquery.Table(table_ref, schema=schema)
            try:
                self.client.create_table(table)
                self.logger.info(f"Table {table_ref} created successfully.")
            except Exception as e:
                self.logger.error(f"Failed to create table {table_ref}: {e}")
                raise NewsDataProcessorError(f"Table creation failed: {e}")
    def filter_existing_data(self, new_data: pd.DataFrame, target_table: str) -> pd.DataFrame:
        """
        Filter out rows that already exist in the target table based on publish_date.

        Args:
            new_data (pd.DataFrame): Incoming new data to be checked for duplicates.
            target_table (str): Fully qualified BigQuery table reference (e.g., `trendsense.market_data.Market_News_History_2`).

        Returns:
            pd.DataFrame: Filtered dataframe with only new rows.
        """
        if new_data.empty:
            self.logger.info("No new data provided for filtering.")
            return new_data

        try:
            # Convert publish_date to ISO string for JSON serialization
            new_data['publish_date'] = pd.to_datetime(new_data['publish_date']).dt.strftime('%Y-%m-%dT%H:%M:%S')

            # Query to fetch existing publish dates
            existing_dates_query = f"""
            SELECT DISTINCT FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', publish_date) AS publish_date
            FROM `{target_table}`
            WHERE FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', publish_date) IN UNNEST(@publish_dates)
            """

            # Prepare query parameters
            job_config = bigquery.QueryJobConfig(
                query_parameters=[
                    bigquery.ArrayQueryParameter('publish_dates', 'STRING', new_data['publish_date'].tolist())
                ]
            )

            # Execute query
            query_job = self.client.query(existing_dates_query, job_config=job_config)
            existing_dates = [row['publish_date'] for row in query_job]

            # Filter out rows with existing publish dates
            filtered_data = new_data[~new_data['publish_date'].isin(existing_dates)]
            self.logger.info(f"Total new rows after filtering: {len(filtered_data)} (from {len(new_data)} original rows)")

            return filtered_data

        except Exception as e:
            self.logger.error(f"Error filtering existing data: {e}")
            return new_data

 

    def process_and_move_data(self, source_table_id: str, target_table_id: str, batch_size: int = 1000) -> Dict[str, Any]:
        source_table = f"{self.project_id}.{self.dataset_id}.{source_table_id}"
        target_table = f"{self.project_id}.{self.dataset_id}.{target_table_id}"

        try:
            # Query data from the source table excluding unwanted columns
            self.logger.info(f"Querying source table: {source_table}")
            query = f"""
            SELECT 
                ticker, 
                title, 
                summary, 
                publisher, 
                link, 
                publish_date, 
                type, 
                related_tickers, 
                source, 
                lexical_diversity, 
                reliability_score, 
                summary_sentiment
            FROM `{source_table}`
            LIMIT {batch_size}
            """
            new_data = self.client.query(query).to_dataframe()
            self.logger.info(f"Rows retrieved from source table: {len(new_data)}")

            if new_data.empty:
                self.logger.info("No new data to process.")
                return {"status": "success", "message": "No new data", "rows_processed": 0}

            # Rename summary_sentiment to textblob_sentiment
            self.logger.info("Renaming columns...")
            new_data.rename(columns={"summary_sentiment": "textblob_sentiment"}, inplace=True)

            # Ensure publish_date is in datetime format
            new_data['publish_date'] = pd.to_datetime(new_data['publish_date'])

            # Filter out existing rows
            self.logger.info("Filtering existing rows...")
            new_data = self.filter_existing_data(new_data, target_table_id)
            self.logger.info(f"Rows remaining after filtering: {len(new_data)}")

            if new_data.empty:
                self.logger.info("No new unique rows to process after filtering.")
                return {"status": "success", "message": "No new unique rows", "rows_processed": 0}

            # Word Count Calculation
            self.logger.info("Calculating word count for summaries...")
            new_data["word_count"] = new_data["summary"].fillna("").apply(lambda x: len(str(x).split()))

            # Headline Sentiment using VADER
            self.logger.info("Performing VADER sentiment analysis on headlines...")
            new_data["headline_sentiment"] = new_data["title"].apply(self.calculate_vader_sentiment)

            # Existing Sentiment Analyses
            self.logger.info("Performing VADER sentiment analysis on summaries...")
            new_data["vader_sentiment"] = new_data["summary"].apply(self.calculate_vader_sentiment)
            bert_results = new_data["summary"].apply(self.calculate_bert_sentiment).tolist()

            # Validate BERT results
            if len(bert_results) != len(new_data):
                self.logger.error(f"BERT results length mismatch: {len(bert_results)} results for {len(new_data)} rows.")
                raise ValueError("BERT results length mismatch with DataFrame rows.")

            # Unpack BERT results into separate columns
            bert_sentiments, bert_confidences = zip(*bert_results)
            new_data["bert_sentiment"] = bert_sentiments
            new_data["bert_confidence"] = bert_confidences

            # Load data into the target table
            self.logger.info("Loading data into BigQuery...")
            job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
            job = self.client.load_table_from_dataframe(new_data, target_table, job_config=job_config)
            job.result()  # Wait for the job to complete

            success_msg = f"Data successfully moved to {target_table}. Rows added: {len(new_data)}"
            self.logger.info(success_msg)

            return {"status": "success", "message": success_msg, "rows_processed": len(new_data)}

        except Exception as e:
            error_msg = f"Error processing data: {e}"
            self.logger.error(error_msg)
            return {"status": "error", "message": error_msg, "rows_processed": 0}


def move_market_news_data(request):
    """
    Google Cloud Function entry point to process and move market news data.
    """
    # Load configuration from environment variables
    project_id = os.getenv('GCP_PROJECT_ID', 'trendsense')
    dataset_id = os.getenv('BQ_DATASET_ID', 'market_data')
    source_table_id = os.getenv('SOURCE_TABLE_ID', 'Market_News_History_New')
    target_table_id = os.getenv('TARGET_TABLE_ID', 'Market_News_History_2')

    # Set up logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    try:
        # Initialize the processor and ensure the target table exists
        processor = NewsDataProcessor(project_id, dataset_id)
        processor.ensure_table_exists(target_table_id)

        # Process and move data
        result = processor.process_and_move_data(source_table_id, target_table_id)

        # Return the result in a response
        return {
            'statusCode': 200 if result['status'] == 'success' else 500,
            'body': result
        }

    except Exception as e:
        logging.error(f"Failed to process market news data: {e}")
        return {
            'statusCode': 500,
            'body': {
                'status': 'error',
                'message': str(e)
            }
        }


In [11]:
import requests
import pandas as pd
from datetime import datetime, timedelta
from textblob import TextBlob

# Replace with your NewsAPI key
api_key = 'afc3fe9ac08745439bf521cb5b974fbc'

# Specify the ticker you want to test
ticker = 'AAPL'

# Get the date 10 days ago and today in ISO format
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=10)
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')

# Function for TextBlob sentiment analysis
def textblob_sentiment(text):
    if text:
        return TextBlob(text).sentiment.polarity  # Sentiment polarity from -1 to 1
    return 0

# Function to fetch market news for a date range
def get_market_news(ticker, start_date, end_date):
    url = (
        f'https://newsapi.org/v2/everything?q={ticker}&from={start_date}&to={end_date}&sortBy=publishedAt&apiKey={api_key}'
    )
    response = requests.get(url)
    if response.status_code == 200:
        return response.json().get('articles', [])
    elif response.status_code == 429:
        print(f"Rate limit exceeded for {ticker}, retrying after delay...")
        time.sleep(5)
        return []
    else:
        print(f"Error fetching data for {ticker}: {response.status_code}")
        return []

# Main function to fetch news and save to CSV
def main():
    all_news = []
    print(f"Fetching news for {ticker} from {start_date_str} to {end_date_str}...")
    articles = get_market_news(ticker, start_date_str, end_date_str)

    for article in articles:
        title = article.get('title', '')
        summary = article.get('description', '')

        # Sentiment analysis using TextBlob
        summary_textblob_sentiment = textblob_sentiment(summary)

        # Article schema
        news_entry = {
            'ticker': ticker,
            'title': title,
            'summary': summary,
            'summary_textblob_sentiment': summary_textblob_sentiment,
            'publisher': article.get('source', {}).get('name', ''),
            'link': article.get('url', ''),
            'publish_date': article.get('publishedAt', ''),
            'source': 'NewsAPI',  # Identify source
        }
        all_news.append(news_entry)

    # Convert data to a DataFrame
    df = pd.DataFrame(all_news)

    if not df.empty:
        # Save to CSV
        output_file = f"{ticker}_news_{start_date_str}_to_{end_date_str}.csv"
        df.to_csv(output_file, index=False)
        print(f"Data successfully saved to {output_file}")
    else:
        print("No news articles found for the specified date range.")

if __name__ == "__main__":
    main()

      

  end_date = datetime.utcnow()


Fetching news for AAPL from 2024-11-30 to 2024-12-10...
Data successfully saved to AAPL_news_2024-11-30_to_2024-12-10.csv
