# 1. Requirements

In [0]:
import os
# Install the required packages from the specified requirements file
os.system("pip install -r https://raw.githubusercontent.com/George-Michael-Dagogo/World_news_tutorial/main/requirements.txt")

dbutils.library.restartPython()

# 2. Load articles from News API

In [0]:
# Import necessary libraries
from newsapi.newsapi_client import NewsApiClient
import pandas as pd
from nltk.corpus import stopwords
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from datetime import date, timedelta

def extract_transform_function():
    today = date.today()
    # Get today's date
    yesterday = today - timedelta(days = 1)
    # Get yesterday's date
    day_before_yesterday = today - timedelta(days = 2)
    # Get the day before yesterday's date

    # Initialize the News API client with an API key
    newsapi = NewsApiClient(api_key='ff4373852c2343a98303951439854f8c')

    # Get top headlines for the entertainment category in English, with a page size of 90
    top_headlines = newsapi.get_top_headlines(   
                                            category='entertainment',
                                            language='en',
                                            page_size = 90,
                                            page= 1)

    # Extract articles from the API response
    articles = top_headlines.get('articles',[])

    # Create a DataFrame from the articles, selecting specific columns
    init_df = pd.DataFrame(articles, columns = ['source','title','publishedAt','author','url'])

    # Extract the 'name' field from the 'source' dictionary in each row
    init_df['source'] = init_df['source'].apply(lambda x: x['name'] if pd.notna(x) and 'name' in x else None)

    # Convert 'publishedAt' to datetime format
    init_df['publishedAt'] = pd.to_datetime(init_df['publishedAt'])

    # Filter the DataFrame for articles published on the day before yesterday or yesterday
    filtered_df = init_df[(init_df['publishedAt'].dt.date == day_before_yesterday) | (init_df['publishedAt'].dt.date == yesterday)]
    # Rename the 'publishedAt' column to 'date_posted'
    filtered_df.rename(columns={'publishedAt': 'date_posted'}, inplace=True)

    # Make a copy of the filtered DataFrame
    df = filtered_df.copy()

    return df

# 3. Parse articles content from url

In [0]:
from bs4 import BeautifulSoup
import requests

# Function to parse article content from url
def full_content(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'
    }
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.text, 'html.parser')
        article_text = ' '.join([p.text for p in soup.find_all('p')])
        return article_text
    except Exception as e:
        print(f"Error retrieving content from {url}: {e}")
        return 'couldnt retrieve'

# 4. Metrics for article content

## 4.1 Count words

In [0]:
# Function to count words in a text excluding stopwords
def count_words_without_stopwords(text):
    if isinstance(text, (str, bytes)):
        words = nltk.word_tokenize(str(text))
        stop_words = set(stopwords.words('english'))
        filtered_words = [word for word in words if word.lower() not in stop_words]
        return len(filtered_words)
    else:
        return 0

## 4.2 Get sentiment

In [0]:
# Function to get sentiment and compound score for a given text
def get_sentiment(row, sid):
    sentiment_scores = sid.polarity_scores(row)
    compound_score = sentiment_scores['compound']

    if compound_score >= 0.05:
        sentiment = 'Positive'
    elif compound_score <= -0.05:
        sentiment = 'Negative'
    else:
        sentiment = 'Neutral'

    return sentiment, compound_score

# 5. Run and create a result DataFrame

In [0]:
def run():
    df = extract_transform_function()
    #display(dataframe)

    # Apply the full_content function to each URL in the DataFrame
    df['content'] = df['url'].apply(full_content)
    # Replace newlines in the 'content' column with spaces
    df['content'] = df['content'].str.replace('\n', ' ')
    # Filter out rows where the content could not be retrieved
    df = df[df['content'] != 'couldnt retrieve']

    # Download the NLTK stopwords dataset and other required datasets
    nltk.download('stopwords')
    nltk.download('punkt')
    nltk.download('wordnet')

    # Apply the word count function to the 'content' column
    df['word_count'] = df['content'].apply(count_words_without_stopwords)

    # Download the VADER sentiment analysis lexicon
    nltk.download('vader_lexicon')

    # Initialize the SentimentIntensityAnalyzer
    sid = SentimentIntensityAnalyzer()

    # Apply the sentiment analysis function to the 'content' column
    df[['sentiment', 'compound_score']] = df['content'].astype(str).apply(lambda x: pd.Series(get_sentiment(x, sid)))

    return df

In [0]:
df = run()

In [0]:
display(df.head())

# 6. Store data

## 6.1 Create schema and table

In [0]:
 %sql
CREATE DATABASE IF NOT EXISTS the_news;
CREATE TABLE IF NOT EXISTS the_news.news_table (
source STRING,
title STRING,
date_posted DATE,
author STRING,
url STRING,
content STRING,
word_count INT,
sentiment STRING,
compound_score DOUBLE
)

## 6.2 Write data

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, DoubleType

# Initialize Spark session
spark = SparkSession.builder.appName("CreateTableExample").getOrCreate()

# Define the schema explicitly if necessary
schema = StructType([
    StructField("source", StringType(), True),
    StructField("title", StringType(), True),
    StructField("date_posted", DateType(), True), 
    StructField("author", StringType(), True),
    StructField("url", StringType(), True),
    StructField("content", StringType(), True),
    StructField("word_count", IntegerType(), True),
    StructField("sentiment", StringType(), True),
    StructField("compound_score", DoubleType(), True)
])

spark_df = spark.createDataFrame(df, schema=schema)
spark_df.write.mode('append').saveAsTable('the_news.news_table')